Map peers by ip only (ignoring port unless on loopback ip) (#2540)

* wip

* big refactor, regretting doing this now

* PeerAddr everywhere

* cleanup

* fixup server tests

* peers api working for GET, POST is still WIP

* we can now ban/unban peers by ip only (port optional)
This commit is contained in:
Antioch Peverell 2019-02-18 12:15:32 +00:00 committed by GitHub
parent dc6542d82b
commit 23cb9e2514
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 373 additions and 422 deletions

View file

@ -14,7 +14,7 @@
use super::utils::w; use super::utils::w;
use crate::p2p; use crate::p2p;
use crate::p2p::types::{PeerInfoDisplay, ReasonForBan}; use crate::p2p::types::{PeerAddr, PeerInfoDisplay, ReasonForBan};
use crate::router::{Handler, ResponseFuture}; use crate::router::{Handler, ResponseFuture};
use crate::web::*; use crate::web::*;
use hyper::{Body, Request, StatusCode}; use hyper::{Body, Request, StatusCode};
@ -57,16 +57,25 @@ pub struct PeerHandler {
impl Handler for PeerHandler { impl Handler for PeerHandler {
fn get(&self, req: Request<Body>) -> ResponseFuture { fn get(&self, req: Request<Body>) -> ResponseFuture {
let command = right_path_element!(req); let command = right_path_element!(req);
if let Ok(addr) = command.parse() {
match w(&self.peers).get_peer(addr) { // We support both "ip" and "ip:port" here for peer_addr.
Ok(peer) => json_response(&peer), // "ip:port" is only really useful for local usernet testing on loopback address.
Err(_) => response(StatusCode::NOT_FOUND, "peer not found"), // Normally we map peers to ip and only allow a single peer per ip address.
} let peer_addr;
if let Ok(ip_addr) = command.parse() {
peer_addr = PeerAddr::from_ip(ip_addr);
} else if let Ok(addr) = command.parse() {
peer_addr = PeerAddr(addr);
} else { } else {
response( return response(
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
format!("peer address unrecognized: {}", req.uri().path()), format!("peer address unrecognized: {}", req.uri().path()),
) );
}
match w(&self.peers).get_peer(peer_addr) {
Ok(peer) => json_response(&peer),
Err(_) => response(StatusCode::NOT_FOUND, "peer not found"),
} }
} }
fn post(&self, req: Request<Body>) -> ResponseFuture { fn post(&self, req: Request<Body>) -> ResponseFuture {
@ -77,20 +86,23 @@ impl Handler for PeerHandler {
}; };
let addr = match path_elems.next() { let addr = match path_elems.next() {
None => return response(StatusCode::BAD_REQUEST, "invalid url"), None => return response(StatusCode::BAD_REQUEST, "invalid url"),
Some(a) => match a.parse() { Some(a) => {
Err(e) => { if let Ok(ip_addr) = a.parse() {
PeerAddr::from_ip(ip_addr)
} else if let Ok(addr) = a.parse() {
PeerAddr(addr)
} else {
return response( return response(
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
format!("invalid peer address: {}", e), format!("invalid peer address: {}", req.uri().path()),
); );
} }
Ok(addr) => addr, }
},
}; };
match command { match command {
"ban" => w(&self.peers).ban_peer(&addr, ReasonForBan::ManualBan), "ban" => w(&self.peers).ban_peer(addr, ReasonForBan::ManualBan),
"unban" => w(&self.peers).unban_peer(&addr), "unban" => w(&self.peers).unban_peer(addr),
_ => return response(StatusCode::BAD_REQUEST, "invalid command"), _ => return response(StatusCode::BAD_REQUEST, "invalid command"),
}; };

View file

@ -5,9 +5,9 @@ extern crate grin_core;
extern crate grin_p2p; extern crate grin_p2p;
use grin_core::ser; use grin_core::ser;
use grin_p2p::msg::SockAddr; use grin_p2p::types::PeerAddr;
fuzz_target!(|data: &[u8]| { fuzz_target!(|data: &[u8]| {
let mut d = data.clone(); let mut d = data.clone();
let _t: Result<SockAddr, ser::Error> = ser::deserialize(&mut d); let _t: Result<PeerAddr, ser::Error> = ser::deserialize(&mut d);
}); });

View file

@ -22,11 +22,9 @@ use rand::{thread_rng, Rng};
use crate::core::core::hash::Hash; use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::msg::{ use crate::msg::{read_message, write_message, Hand, Shake, Type, PROTOCOL_VERSION, USER_AGENT};
read_message, write_message, Hand, Shake, SockAddr, Type, PROTOCOL_VERSION, USER_AGENT,
};
use crate::peer::Peer; use crate::peer::Peer;
use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerInfo, PeerLiveInfo}; use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerAddr, PeerInfo, PeerLiveInfo};
/// Local generated nonce for peer connecting. /// Local generated nonce for peer connecting.
/// Used for self-connecting detection (on receiver side), /// Used for self-connecting detection (on receiver side),
@ -44,7 +42,7 @@ pub struct Handshake {
/// a node id. /// a node id.
nonces: Arc<RwLock<VecDeque<u64>>>, nonces: Arc<RwLock<VecDeque<u64>>>,
/// Ring buffer of self addr(s) collected from PeerWithSelf detection (by nonce). /// Ring buffer of self addr(s) collected from PeerWithSelf detection (by nonce).
pub addrs: Arc<RwLock<VecDeque<SocketAddr>>>, pub addrs: Arc<RwLock<VecDeque<PeerAddr>>>,
/// The genesis block header of the chain seen by this node. /// The genesis block header of the chain seen by this node.
/// We only want to connect to other nodes seeing the same chain (forks are /// We only want to connect to other nodes seeing the same chain (forks are
/// ok). /// ok).
@ -67,13 +65,13 @@ impl Handshake {
&self, &self,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
self_addr: SocketAddr, self_addr: PeerAddr,
conn: &mut TcpStream, conn: &mut TcpStream,
) -> Result<PeerInfo, Error> { ) -> Result<PeerInfo, Error> {
// prepare the first part of the handshake // prepare the first part of the handshake
let nonce = self.next_nonce(); let nonce = self.next_nonce();
let peer_addr = match conn.peer_addr() { let peer_addr = match conn.peer_addr() {
Ok(pa) => pa, Ok(pa) => PeerAddr(pa),
Err(e) => return Err(Error::Connection(e)), Err(e) => return Err(Error::Connection(e)),
}; };
@ -83,8 +81,8 @@ impl Handshake {
nonce: nonce, nonce: nonce,
genesis: self.genesis, genesis: self.genesis,
total_difficulty: total_difficulty, total_difficulty: total_difficulty,
sender_addr: SockAddr(self_addr), sender_addr: self_addr,
receiver_addr: SockAddr(peer_addr), receiver_addr: peer_addr,
user_agent: USER_AGENT.to_string(), user_agent: USER_AGENT.to_string(),
}; };
@ -118,7 +116,7 @@ impl Handshake {
// If denied then we want to close the connection // If denied then we want to close the connection
// (without providing our peer with any details why). // (without providing our peer with any details why).
if Peer::is_denied(&self.config, &peer_info.addr) { if Peer::is_denied(&self.config, peer_info.addr) {
return Err(Error::ConnectionClose); return Err(Error::ConnectionClose);
} }
@ -155,7 +153,7 @@ impl Handshake {
} else { } else {
// check the nonce to see if we are trying to connect to ourselves // check the nonce to see if we are trying to connect to ourselves
let nonces = self.nonces.read(); let nonces = self.nonces.read();
let addr = extract_ip(&hand.sender_addr.0, &conn); let addr = resolve_peer_addr(hand.sender_addr, &conn);
if nonces.contains(&hand.nonce) { if nonces.contains(&hand.nonce) {
// save ip addresses of ourselves // save ip addresses of ourselves
let mut addrs = self.addrs.write(); let mut addrs = self.addrs.write();
@ -171,7 +169,7 @@ impl Handshake {
let peer_info = PeerInfo { let peer_info = PeerInfo {
capabilities: hand.capabilities, capabilities: hand.capabilities,
user_agent: hand.user_agent, user_agent: hand.user_agent,
addr: extract_ip(&hand.sender_addr.0, &conn), addr: resolve_peer_addr(hand.sender_addr, &conn),
version: hand.version, version: hand.version,
live_info: Arc::new(RwLock::new(PeerLiveInfo { live_info: Arc::new(RwLock::new(PeerLiveInfo {
total_difficulty: hand.total_difficulty, total_difficulty: hand.total_difficulty,
@ -186,7 +184,7 @@ impl Handshake {
// so check if we are configured to explicitly allow or deny it. // so check if we are configured to explicitly allow or deny it.
// If denied then we want to close the connection // If denied then we want to close the connection
// (without providing our peer with any details why). // (without providing our peer with any details why).
if Peer::is_denied(&self.config, &peer_info.addr) { if Peer::is_denied(&self.config, peer_info.addr) {
return Err(Error::ConnectionClose); return Err(Error::ConnectionClose);
} }
@ -219,28 +217,12 @@ impl Handshake {
} }
} }
// Attempts to make a best guess at the correct remote IP by checking if the /// Resolve the correct peer_addr based on the connection and the advertised port.
// advertised address is the loopback and our TCP connection. Note that the fn resolve_peer_addr(advertised: PeerAddr, conn: &TcpStream) -> PeerAddr {
// port reported by the connection is always incorrect for receiving let port = advertised.0.port();
// connections as it's dynamically allocated by the server. if let Ok(addr) = conn.peer_addr() {
fn extract_ip(advertised: &SocketAddr, conn: &TcpStream) -> SocketAddr { PeerAddr(SocketAddr::new(addr.ip(), port))
match advertised { } else {
&SocketAddr::V4(v4sock) => { advertised
let ip = v4sock.ip();
if ip.is_loopback() || ip.is_unspecified() {
if let Ok(addr) = conn.peer_addr() {
return SocketAddr::new(addr.ip(), advertised.port());
}
}
}
&SocketAddr::V6(v6sock) => {
let ip = v6sock.ip();
if ip.is_loopback() || ip.is_unspecified() {
if let Ok(addr) = conn.peer_addr() {
return SocketAddr::new(addr.ip(), advertised.port());
}
}
}
} }
advertised.clone()
} }

View file

@ -52,6 +52,6 @@ pub use crate::peers::Peers;
pub use crate::serv::{DummyAdapter, Server}; pub use crate::serv::{DummyAdapter, Server};
pub use crate::store::{PeerData, State}; pub use crate::store::{PeerData, State};
pub use crate::types::{ pub use crate::types::{
Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerInfo, ReasonForBan, Seeding, Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, Seeding, TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
}; };

View file

@ -16,7 +16,6 @@
use num::FromPrimitive; use num::FromPrimitive;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::time; use std::time;
use crate::core::core::hash::Hash; use crate::core::core::hash::Hash;
@ -25,7 +24,7 @@ use crate::core::pow::Difficulty;
use crate::core::ser::{self, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer}; use crate::core::ser::{self, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer};
use crate::core::{consensus, global}; use crate::core::{consensus, global};
use crate::types::{ use crate::types::{
Capabilities, Error, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
}; };
use crate::util::read_write::read_exact; use crate::util::read_write::read_exact;
@ -254,9 +253,9 @@ pub struct Hand {
/// may be needed /// may be needed
pub total_difficulty: Difficulty, pub total_difficulty: Difficulty,
/// network address of the sender /// network address of the sender
pub sender_addr: SockAddr, pub sender_addr: PeerAddr,
/// network address of the receiver /// network address of the receiver
pub receiver_addr: SockAddr, pub receiver_addr: PeerAddr,
/// name of version of the software /// name of version of the software
pub user_agent: String, pub user_agent: String,
} }
@ -283,8 +282,8 @@ impl Readable for Hand {
let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64); let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64);
let capabilities = Capabilities::from_bits_truncate(capab); let capabilities = Capabilities::from_bits_truncate(capab);
let total_diff = Difficulty::read(reader)?; let total_diff = Difficulty::read(reader)?;
let sender_addr = SockAddr::read(reader)?; let sender_addr = PeerAddr::read(reader)?;
let receiver_addr = SockAddr::read(reader)?; let receiver_addr = PeerAddr::read(reader)?;
let ua = reader.read_bytes_len_prefix()?; let ua = reader.read_bytes_len_prefix()?;
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
let genesis = Hash::read(reader)?; let genesis = Hash::read(reader)?;
@ -373,7 +372,7 @@ impl Readable for GetPeerAddrs {
/// GetPeerAddrs. /// GetPeerAddrs.
#[derive(Debug)] #[derive(Debug)]
pub struct PeerAddrs { pub struct PeerAddrs {
pub peers: Vec<SockAddr>, pub peers: Vec<PeerAddr>,
} }
impl Writeable for PeerAddrs { impl Writeable for PeerAddrs {
@ -394,10 +393,9 @@ impl Readable for PeerAddrs {
} else if peer_count == 0 { } else if peer_count == 0 {
return Ok(PeerAddrs { peers: vec![] }); return Ok(PeerAddrs { peers: vec![] });
} }
// let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader));
let mut peers = Vec::with_capacity(peer_count as usize); let mut peers = Vec::with_capacity(peer_count as usize);
for _ in 0..peer_count { for _ in 0..peer_count {
peers.push(SockAddr::read(reader)?); peers.push(PeerAddr::read(reader)?);
} }
Ok(PeerAddrs { peers: peers }) Ok(PeerAddrs { peers: peers })
} }
@ -431,58 +429,6 @@ impl Readable for PeerError {
} }
} }
/// Only necessary so we can implement Readable and Writeable. Rust disallows
/// implementing traits when both types are outside of this crate (which is the
/// case for SocketAddr and Readable/Writeable).
#[derive(Debug)]
pub struct SockAddr(pub SocketAddr);
impl Writeable for SockAddr {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
match self.0 {
SocketAddr::V4(sav4) => {
ser_multiwrite!(
writer,
[write_u8, 0],
[write_fixed_bytes, &sav4.ip().octets().to_vec()],
[write_u16, sav4.port()]
);
}
SocketAddr::V6(sav6) => {
writer.write_u8(1)?;
for seg in &sav6.ip().segments() {
writer.write_u16(*seg)?;
}
writer.write_u16(sav6.port())?;
}
}
Ok(())
}
}
impl Readable for SockAddr {
fn read(reader: &mut dyn Reader) -> Result<SockAddr, ser::Error> {
let v4_or_v6 = reader.read_u8()?;
if v4_or_v6 == 0 {
let ip = reader.read_fixed_bytes(4)?;
let port = reader.read_u16()?;
Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]),
port,
))))
} else {
let ip = try_iter_map_vec!(0..8, |_| reader.read_u16());
let port = reader.read_u16()?;
Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]),
port,
0,
0,
))))
}
}
}
/// Serializable wrapper for the block locator. /// Serializable wrapper for the block locator.
#[derive(Debug)] #[derive(Debug)]
pub struct Locator { pub struct Locator {

View file

@ -14,7 +14,7 @@
use crate::util::{Mutex, RwLock}; use crate::util::{Mutex, RwLock};
use std::fs::File; use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpStream}; use std::net::{Shutdown, TcpStream};
use std::sync::Arc; use std::sync::Arc;
use crate::conn; use crate::conn;
@ -25,7 +25,8 @@ use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest}; use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest};
use crate::protocol::Protocol; use crate::protocol::Protocol;
use crate::types::{ use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerInfo, ReasonForBan, TxHashSetRead, Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
}; };
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
@ -93,7 +94,7 @@ impl Peer {
conn: &mut TcpStream, conn: &mut TcpStream,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
self_addr: SocketAddr, self_addr: PeerAddr,
hs: &Handshake, hs: &Handshake,
na: Arc<dyn NetAdapter>, na: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> { ) -> Result<Peer, Error> {
@ -124,10 +125,9 @@ impl Peer {
self.connection = Some(Mutex::new(conn::listen(conn, handler))); self.connection = Some(Mutex::new(conn::listen(conn, handler)));
} }
pub fn is_denied(config: &P2PConfig, peer_addr: &SocketAddr) -> bool { pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool {
let peer = format!("{}:{}", peer_addr.ip(), peer_addr.port());
if let Some(ref denied) = config.peers_deny { if let Some(ref denied) = config.peers_deny {
if denied.contains(&peer) { if denied.contains(&peer_addr) {
debug!( debug!(
"checking peer allowed/denied: {:?} explicitly denied", "checking peer allowed/denied: {:?} explicitly denied",
peer_addr peer_addr
@ -136,7 +136,7 @@ impl Peer {
} }
} }
if let Some(ref allowed) = config.peers_allow { if let Some(ref allowed) = config.peers_allow {
if allowed.contains(&peer) { if allowed.contains(&peer_addr) {
debug!( debug!(
"checking peer allowed/denied: {:?} explicitly allowed", "checking peer allowed/denied: {:?} explicitly allowed",
peer_addr peer_addr
@ -566,7 +566,7 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_transaction(kernel_hash) self.adapter.get_transaction(kernel_hash)
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
self.push_recv(kernel_hash); self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr) self.adapter.tx_kernel_received(kernel_hash, addr)
} }
@ -582,23 +582,23 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.transaction_received(tx, stem) self.adapter.transaction_received(tx, stem)
} }
fn block_received(&self, b: core::Block, addr: SocketAddr, _was_requested: bool) -> bool { fn block_received(&self, b: core::Block, addr: PeerAddr, _was_requested: bool) -> bool {
let bh = b.hash(); let bh = b.hash();
self.push_recv(bh); self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh)) self.adapter.block_received(b, addr, self.has_req(bh))
} }
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
self.push_recv(cb.hash()); self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr) self.adapter.compact_block_received(cb, addr)
} }
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
self.push_recv(bh.hash()); self.push_recv(bh.hash());
self.adapter.header_received(bh, addr) self.adapter.header_received(bh, addr)
} }
fn headers_received(&self, bh: &[core::BlockHeader], addr: SocketAddr) -> bool { fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool {
self.adapter.headers_received(bh, addr) self.adapter.headers_received(bh, addr)
} }
@ -618,7 +618,7 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.txhashset_receive_ready() self.adapter.txhashset_receive_ready()
} }
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool { fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
self.adapter.txhashset_write(h, txhashset_data, peer_addr) self.adapter.txhashset_write(h, txhashset_data, peer_addr)
} }
@ -634,19 +634,19 @@ impl ChainAdapter for TrackingAdapter {
} }
impl NetAdapter for TrackingAdapter { impl NetAdapter for TrackingAdapter {
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> { fn find_peer_addrs(&self, capab: Capabilities) -> Vec<PeerAddr> {
self.adapter.find_peer_addrs(capab) self.adapter.find_peer_addrs(capab)
} }
fn peer_addrs_received(&self, addrs: Vec<SocketAddr>) { fn peer_addrs_received(&self, addrs: Vec<PeerAddr>) {
self.adapter.peer_addrs_received(addrs) self.adapter.peer_addrs_received(addrs)
} }
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { fn peer_difficulty(&self, addr: PeerAddr, diff: Difficulty, height: u64) {
self.adapter.peer_difficulty(addr, diff, height) self.adapter.peer_difficulty(addr, diff, height)
} }
fn is_banned(&self, addr: SocketAddr) -> bool { fn is_banned(&self, addr: PeerAddr) -> bool {
self.adapter.is_banned(addr) self.adapter.is_banned(addr)
} }
} }

View file

@ -15,7 +15,6 @@
use crate::util::RwLock; use crate::util::RwLock;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@ -30,14 +29,14 @@ use chrono::Duration;
use crate::peer::Peer; use crate::peer::Peer;
use crate::store::{PeerData, PeerStore, State}; use crate::store::{PeerData, PeerStore, State};
use crate::types::{ use crate::types::{
Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, ReasonForBan, Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS, TxHashSetRead, MAX_PEER_ADDRS,
}; };
pub struct Peers { pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>, pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore, store: PeerStore,
peers: RwLock<HashMap<SocketAddr, Arc<Peer>>>, peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
dandelion_relay: RwLock<Option<(i64, Arc<Peer>)>>, dandelion_relay: RwLock<Option<(i64, Arc<Peer>)>>,
config: P2PConfig, config: P2PConfig,
} }
@ -56,33 +55,25 @@ impl Peers {
/// Adds the peer to our internal peer mapping. Note that the peer is still /// Adds the peer to our internal peer mapping. Note that the peer is still
/// returned so the server can run it. /// returned so the server can run it.
pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> { pub fn add_connected(&self, peer: Arc<Peer>) -> Result<(), Error> {
let peer_data: PeerData; let peer_data = PeerData {
let addr: SocketAddr; addr: peer.info.addr,
{ capabilities: peer.info.capabilities,
peer_data = PeerData { user_agent: peer.info.user_agent.clone(),
addr: peer.info.addr, flags: State::Healthy,
capabilities: peer.info.capabilities, last_banned: 0,
user_agent: peer.info.user_agent.clone(), ban_reason: ReasonForBan::None,
flags: State::Healthy, last_connected: Utc::now().timestamp(),
last_banned: 0, };
ban_reason: ReasonForBan::None, debug!("Saving newly connected peer {}.", peer_data.addr);
last_connected: Utc::now().timestamp(),
};
addr = peer.info.addr.clone();
}
debug!("Saving newly connected peer {}.", addr);
self.save_peer(&peer_data)?; self.save_peer(&peer_data)?;
self.peers.write().insert(peer_data.addr, peer.clone());
{
let mut peers = self.peers.write();
peers.insert(addr, peer.clone());
}
Ok(()) Ok(())
} }
/// Add a peer as banned to block future connections, usually due to failed /// Add a peer as banned to block future connections, usually due to failed
/// handshake /// handshake
pub fn add_banned(&self, addr: SocketAddr, ban_reason: ReasonForBan) -> Result<(), Error> { pub fn add_banned(&self, addr: PeerAddr, ban_reason: ReasonForBan) -> Result<(), Error> {
let peer_data = PeerData { let peer_data = PeerData {
addr, addr,
capabilities: Capabilities::UNKNOWN, capabilities: Capabilities::UNKNOWN,
@ -129,18 +120,8 @@ impl Peers {
self.dandelion_relay.read().clone() self.dandelion_relay.read().clone()
} }
pub fn is_known(&self, addr: &SocketAddr) -> bool { pub fn is_known(&self, addr: PeerAddr) -> bool {
self.peers.read().contains_key(addr) self.peers.read().contains_key(&addr)
}
/// Check whether an ip address is in the active peers list, ignore the port
pub fn is_known_ip(&self, addr: &SocketAddr) -> bool {
for socket in self.peers.read().keys() {
if addr.ip() == socket.ip() {
return true;
}
}
return false;
} }
/// Get vec of peers we are currently connected to. /// Get vec of peers we are currently connected to.
@ -166,8 +147,8 @@ impl Peers {
} }
/// Get a peer we're connected to by address. /// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option<Arc<Peer>> { pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> {
self.peers.read().get(addr).map(|p| p.clone()) self.peers.read().get(&addr).map(|p| p.clone())
} }
/// Number of peers currently connected to. /// Number of peers currently connected to.
@ -257,31 +238,18 @@ impl Peers {
self.most_work_peers().pop() self.most_work_peers().pop()
} }
pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { pub fn is_banned(&self, peer_addr: PeerAddr) -> bool {
if global::is_production_mode() { if let Ok(peer) = self.store.get_peer(peer_addr) {
// Ban only cares about ip address, no mather what port. if peer.flags == State::Banned {
// so, we query all saved peers with one same ip address, and ignore port return true;
let peers_data = self.store.find_peers_by_ip(peer_addr);
for peer_data in peers_data {
if peer_data.flags == State::Banned {
return true;
}
}
} else {
// For travis-ci test, we need run multiple nodes in one server, with same ip address.
// so, just query the ip address and the port
if let Ok(peer_data) = self.store.get_peer(peer_addr) {
if peer_data.flags == State::Banned {
return true;
}
} }
} }
false false
} }
/// Ban a peer, disconnecting it if we're currently connected /// Ban a peer, disconnecting it if we're currently connected
pub fn ban_peer(&self, peer_addr: &SocketAddr, ban_reason: ReasonForBan) { pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) {
if let Err(e) = self.update_state(*peer_addr, State::Banned) { if let Err(e) = self.update_state(peer_addr, State::Banned) {
error!("Couldn't ban {}: {:?}", peer_addr, e); error!("Couldn't ban {}: {:?}", peer_addr, e);
} }
@ -295,12 +263,12 @@ impl Peers {
} }
/// Unban a peer, checks if it exists and banned then unban /// Unban a peer, checks if it exists and banned then unban
pub fn unban_peer(&self, peer_addr: &SocketAddr) { pub fn unban_peer(&self, peer_addr: PeerAddr) {
debug!("unban_peer: peer {}", peer_addr); debug!("unban_peer: peer {}", peer_addr);
match self.get_peer(*peer_addr) { match self.get_peer(peer_addr) {
Ok(_) => { Ok(_) => {
if self.is_banned(*peer_addr) { if self.is_banned(peer_addr) {
if let Err(e) = self.update_state(*peer_addr, State::Healthy) { if let Err(e) = self.update_state(peer_addr, State::Healthy) {
error!("Couldn't unban {}: {:?}", peer_addr, e); error!("Couldn't unban {}: {:?}", peer_addr, e);
} }
} else { } else {
@ -424,12 +392,12 @@ impl Peers {
} }
/// Get peer in store by address /// Get peer in store by address
pub fn get_peer(&self, peer_addr: SocketAddr) -> Result<PeerData, Error> { pub fn get_peer(&self, peer_addr: PeerAddr) -> Result<PeerData, Error> {
self.store.get_peer(peer_addr).map_err(From::from) self.store.get_peer(peer_addr).map_err(From::from)
} }
/// Whether we've already seen a peer with the provided address /// Whether we've already seen a peer with the provided address
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> { pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result<bool, Error> {
self.store.exists_peer(peer_addr).map_err(From::from) self.store.exists_peer(peer_addr).map_err(From::from)
} }
@ -439,7 +407,7 @@ impl Peers {
} }
/// Updates the state of a peer in store /// Updates the state of a peer in store
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> {
self.store self.store
.update_state(peer_addr, new_state) .update_state(peer_addr, new_state)
.map_err(From::from) .map_err(From::from)
@ -495,9 +463,9 @@ impl Peers {
// now clean up peer map based on the list to remove // now clean up peer map based on the list to remove
{ {
let mut peers = self.peers.write(); let mut peers = self.peers.write();
for p in rm { for addr in rm {
let _ = peers.get(&p).map(|p| p.stop()); let _ = peers.get(&addr).map(|peer| peer.stop());
peers.remove(&p); peers.remove(&addr);
} }
} }
} }
@ -558,7 +526,7 @@ impl ChainAdapter for Peers {
self.adapter.get_transaction(kernel_hash) self.adapter.get_transaction(kernel_hash)
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
self.adapter.tx_kernel_received(kernel_hash, addr) self.adapter.tx_kernel_received(kernel_hash, addr)
} }
@ -566,7 +534,7 @@ impl ChainAdapter for Peers {
self.adapter.transaction_received(tx, stem) self.adapter.transaction_received(tx, stem)
} }
fn block_received(&self, b: core::Block, peer_addr: SocketAddr, was_requested: bool) -> bool { fn block_received(&self, b: core::Block, peer_addr: PeerAddr, was_requested: bool) -> bool {
let hash = b.hash(); let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested) { if !self.adapter.block_received(b, peer_addr, was_requested) {
// if the peer sent us a block that's intrinsically bad // if the peer sent us a block that's intrinsically bad
@ -575,45 +543,45 @@ impl ChainAdapter for Peers {
"Received a bad block {} from {}, the peer will be banned", "Received a bad block {} from {}, the peer will be banned",
hash, peer_addr hash, peer_addr
); );
self.ban_peer(&peer_addr, ReasonForBan::BadBlock); self.ban_peer(peer_addr, ReasonForBan::BadBlock);
false false
} else { } else {
true true
} }
} }
fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: SocketAddr) -> bool { fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: PeerAddr) -> bool {
let hash = cb.hash(); let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr) { if !self.adapter.compact_block_received(cb, peer_addr) {
// if the peer sent us a block that's intrinsically bad // if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
debug!( debug!(
"Received a bad compact block {} from {}, the peer will be banned", "Received a bad compact block {} from {}, the peer will be banned",
hash, &peer_addr hash, peer_addr
); );
self.ban_peer(&peer_addr, ReasonForBan::BadCompactBlock); self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock);
false false
} else { } else {
true true
} }
} }
fn header_received(&self, bh: core::BlockHeader, peer_addr: SocketAddr) -> bool { fn header_received(&self, bh: core::BlockHeader, peer_addr: PeerAddr) -> bool {
if !self.adapter.header_received(bh, peer_addr) { if !self.adapter.header_received(bh, peer_addr) {
// if the peer sent us a block header that's intrinsically bad // if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
self.ban_peer(&peer_addr, ReasonForBan::BadBlockHeader); self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false false
} else { } else {
true true
} }
} }
fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: SocketAddr) -> bool { fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: PeerAddr) -> bool {
if !self.adapter.headers_received(headers, peer_addr) { if !self.adapter.headers_received(headers, peer_addr) {
// if the peer sent us a block header that's intrinsically bad // if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban // they are either mistaken or malevolent, both of which require a ban
self.ban_peer(&peer_addr, ReasonForBan::BadBlockHeader); self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false false
} else { } else {
true true
@ -636,13 +604,13 @@ impl ChainAdapter for Peers {
self.adapter.txhashset_receive_ready() self.adapter.txhashset_receive_ready()
} }
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool { fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) { if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) {
debug!( debug!(
"Received a bad txhashset data from {}, the peer will be banned", "Received a bad txhashset data from {}, the peer will be banned",
&peer_addr &peer_addr
); );
self.ban_peer(&peer_addr, ReasonForBan::BadTxHashSet); self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet);
false false
} else { } else {
true true
@ -663,14 +631,14 @@ impl ChainAdapter for Peers {
impl NetAdapter for Peers { impl NetAdapter for Peers {
/// Find good peers we know with the provided capability and return their /// Find good peers we know with the provided capability and return their
/// addresses. /// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> { fn find_peer_addrs(&self, capab: Capabilities) -> Vec<PeerAddr> {
let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize); let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize);
trace!("find_peer_addrs: {} healthy peers picked", peers.len()); trace!("find_peer_addrs: {} healthy peers picked", peers.len());
map_vec!(peers, |p| p.addr) map_vec!(peers, |p| p.addr)
} }
/// A list of peers has been received from one of our peers. /// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) { fn peer_addrs_received(&self, peer_addrs: Vec<PeerAddr>) {
trace!("Received {} peer addrs, saving.", peer_addrs.len()); trace!("Received {} peer addrs, saving.", peer_addrs.len());
for pa in peer_addrs { for pa in peer_addrs {
if let Ok(e) = self.exists_peer(pa) { if let Ok(e) = self.exists_peer(pa) {
@ -693,13 +661,13 @@ impl NetAdapter for Peers {
} }
} }
fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { fn peer_difficulty(&self, addr: PeerAddr, diff: Difficulty, height: u64) {
if let Some(peer) = self.get_connected_peer(&addr) { if let Some(peer) = self.get_connected_peer(addr) {
peer.info.update(height, diff); peer.info.update(height, diff);
} }
} }
fn is_banned(&self, addr: SocketAddr) -> bool { fn is_banned(&self, addr: PeerAddr) -> bool {
if let Ok(peer) = self.get_peer(addr) { if let Ok(peer) = self.get_peer(addr) {
peer.flags == State::Banned peer.flags == State::Banned
} else { } else {

View file

@ -16,7 +16,6 @@ use std::cmp;
use std::env; use std::env;
use std::fs::File; use std::fs::File;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use crate::conn::{Message, MessageHandler, Response}; use crate::conn::{Message, MessageHandler, Response};
@ -25,18 +24,18 @@ use crate::util::{RateCounter, RwLock};
use chrono::prelude::Utc; use chrono::prelude::Utc;
use crate::msg::{ use crate::msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, TxHashSetArchive, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive,
TxHashSetRequest, Type, TxHashSetRequest, Type,
}; };
use crate::types::{Error, NetAdapter}; use crate::types::{Error, NetAdapter, PeerAddr};
pub struct Protocol { pub struct Protocol {
adapter: Arc<dyn NetAdapter>, adapter: Arc<dyn NetAdapter>,
addr: SocketAddr, addr: PeerAddr,
} }
impl Protocol { impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, addr: SocketAddr) -> Protocol { pub fn new(adapter: Arc<dyn NetAdapter>, addr: PeerAddr) -> Protocol {
Protocol { adapter, addr } Protocol { adapter, addr }
} }
} }
@ -231,19 +230,17 @@ impl MessageHandler for Protocol {
Type::GetPeerAddrs => { Type::GetPeerAddrs => {
let get_peers: GetPeerAddrs = msg.body()?; let get_peers: GetPeerAddrs = msg.body()?;
let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities); let peers = adapter.find_peer_addrs(get_peers.capabilities);
Ok(Some(Response::new( Ok(Some(Response::new(
Type::PeerAddrs, Type::PeerAddrs,
PeerAddrs { PeerAddrs { peers },
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
},
writer, writer,
))) )))
} }
Type::PeerAddrs => { Type::PeerAddrs => {
let peer_addrs: PeerAddrs = msg.body()?; let peer_addrs: PeerAddrs = msg.body()?;
adapter.peer_addrs_received(peer_addrs.peers.iter().map(|pa| pa.0).collect()); adapter.peer_addrs_received(peer_addrs.peers);
Ok(None) Ok(None)
} }

View file

@ -29,7 +29,7 @@ use crate::peer::Peer;
use crate::peers::Peers; use crate::peers::Peers;
use crate::store::PeerStore; use crate::store::PeerStore;
use crate::types::{ use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, ReasonForBan, Seeding, TxHashSetRead, Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, TxHashSetRead,
}; };
use crate::util::{Mutex, StopState}; use crate::util::{Mutex, StopState};
use chrono::prelude::{DateTime, Utc}; use chrono::prelude::{DateTime, Utc};
@ -82,6 +82,8 @@ impl Server {
match listener.accept() { match listener.accept() {
Ok((stream, peer_addr)) => { Ok((stream, peer_addr)) => {
let peer_addr = PeerAddr(peer_addr);
if self.check_undesirable(&stream) { if self.check_undesirable(&stream) {
continue; continue;
} }
@ -107,8 +109,8 @@ impl Server {
/// Asks the server to connect to a new peer. Directly returns the peer if /// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address. /// we're already connected to the provided address.
pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<Peer>, Error> { pub fn connect(&self, addr: PeerAddr) -> Result<Arc<Peer>, Error> {
if Peer::is_denied(&self.config, &addr) { if Peer::is_denied(&self.config, addr) {
debug!("connect_peer: peer {} denied, not connecting.", addr); debug!("connect_peer: peer {} denied, not connecting.", addr);
return Err(Error::ConnectionClose); return Err(Error::ConnectionClose);
} }
@ -134,7 +136,7 @@ impl Server {
self.config.port, self.config.port,
addr addr
); );
match TcpStream::connect_timeout(addr, Duration::from_secs(10)) { match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(mut stream) => { Ok(mut stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port); let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty(); let total_diff = self.peers.total_difficulty();
@ -143,7 +145,7 @@ impl Server {
&mut stream, &mut stream,
self.capabilities, self.capabilities,
total_diff, total_diff,
addr, PeerAddr(addr),
&self.handshake, &self.handshake,
self.peers.clone(), self.peers.clone(),
)?; )?;
@ -191,13 +193,17 @@ impl Server {
/// different sets of peers themselves. In addition, it prevent potential /// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not. /// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool { fn check_undesirable(&self, stream: &TcpStream) -> bool {
// peer has been banned, go away!
if let Ok(peer_addr) = stream.peer_addr() { if let Ok(peer_addr) = stream.peer_addr() {
let banned = self.peers.is_banned(peer_addr); let peer_addr = PeerAddr(peer_addr);
let known_ip = if self.peers.is_banned(peer_addr) {
self.peers.is_known_ip(&peer_addr) && self.config.seeding_type == Seeding::DNSSeed; debug!("Peer {} banned, refusing connection.", peer_addr);
if banned || known_ip { if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Peer {} banned or known, refusing connection.", peer_addr); debug!("Error shutting down conn: {:?}", e);
}
return true;
}
if self.peers.is_known(peer_addr) {
debug!("Peer {} already known, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) { if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e); debug!("Error shutting down conn: {:?}", e);
} }
@ -234,18 +240,18 @@ impl ChainAdapter for DummyAdapter {
fn get_transaction(&self, _h: Hash) -> Option<core::Transaction> { fn get_transaction(&self, _h: Hash) -> Option<core::Transaction> {
None None
} }
fn tx_kernel_received(&self, _h: Hash, _addr: SocketAddr) {} fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) {}
fn transaction_received(&self, _: core::Transaction, _stem: bool) {} fn transaction_received(&self, _: core::Transaction, _stem: bool) {}
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { fn compact_block_received(&self, _cb: core::CompactBlock, _addr: PeerAddr) -> bool {
true true
} }
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { fn header_received(&self, _bh: core::BlockHeader, _addr: PeerAddr) -> bool {
true true
} }
fn block_received(&self, _: core::Block, _: SocketAddr, _: bool) -> bool { fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> bool {
true true
} }
fn headers_received(&self, _: &[core::BlockHeader], _: SocketAddr) -> bool { fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> bool {
true true
} }
fn locate_headers(&self, _: &[Hash]) -> Vec<core::BlockHeader> { fn locate_headers(&self, _: &[Hash]) -> Vec<core::BlockHeader> {
@ -262,7 +268,7 @@ impl ChainAdapter for DummyAdapter {
false false
} }
fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: SocketAddr) -> bool { fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: PeerAddr) -> bool {
false false
} }
@ -277,12 +283,12 @@ impl ChainAdapter for DummyAdapter {
} }
impl NetAdapter for DummyAdapter { impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> { fn find_peer_addrs(&self, _: Capabilities) -> Vec<PeerAddr> {
vec![] vec![]
} }
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {} fn peer_addrs_received(&self, _: Vec<PeerAddr>) {}
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _: u64) {} fn peer_difficulty(&self, _: PeerAddr, _: Difficulty, _: u64) {}
fn is_banned(&self, _: SocketAddr) -> bool { fn is_banned(&self, _: PeerAddr) -> bool {
false false
} }
} }

View file

@ -17,19 +17,17 @@
use chrono::Utc; use chrono::Utc;
use num::FromPrimitive; use num::FromPrimitive;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use crate::lmdb; use crate::lmdb;
use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; use crate::core::ser::{self, Readable, Reader, Writeable, Writer};
use crate::msg::SockAddr; use crate::types::{Capabilities, PeerAddr, ReasonForBan};
use crate::types::{Capabilities, ReasonForBan};
use grin_store::{self, option_to_not_found, to_key, Error}; use grin_store::{self, option_to_not_found, to_key, Error};
const STORE_SUBPATH: &'static str = "peers"; const STORE_SUBPATH: &'static str = "peers";
const PEER_PREFIX: u8 = 'p' as u8; const PEER_PREFIX: u8 = 'P' as u8;
/// Types of messages /// Types of messages
enum_from_primitive! { enum_from_primitive! {
@ -45,7 +43,7 @@ enum_from_primitive! {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerData { pub struct PeerData {
/// Network address of the peer. /// Network address of the peer.
pub addr: SocketAddr, pub addr: PeerAddr,
/// What capabilities the peer advertises. Unknown until a successful /// What capabilities the peer advertises. Unknown until a successful
/// connection. /// connection.
pub capabilities: Capabilities, pub capabilities: Capabilities,
@ -63,7 +61,7 @@ pub struct PeerData {
impl Writeable for PeerData { impl Writeable for PeerData {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> { fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
SockAddr(self.addr).write(writer)?; self.addr.write(writer)?;
ser_multiwrite!( ser_multiwrite!(
writer, writer,
[write_u32, self.capabilities.bits()], [write_u32, self.capabilities.bits()],
@ -79,7 +77,7 @@ impl Writeable for PeerData {
impl Readable for PeerData { impl Readable for PeerData {
fn read(reader: &mut dyn Reader) -> Result<PeerData, ser::Error> { fn read(reader: &mut dyn Reader) -> Result<PeerData, ser::Error> {
let addr = SockAddr::read(reader)?; let addr = PeerAddr::read(reader)?;
let capab = reader.read_u32()?; let capab = reader.read_u32()?;
let ua = reader.read_bytes_len_prefix()?; let ua = reader.read_bytes_len_prefix()?;
let (fl, lb, br) = ser_multiread!(reader, read_u8, read_i64, read_i32); let (fl, lb, br) = ser_multiread!(reader, read_u8, read_i64, read_i32);
@ -99,7 +97,7 @@ impl Readable for PeerData {
match State::from_u8(fl) { match State::from_u8(fl) {
Some(flags) => Ok(PeerData { Some(flags) => Ok(PeerData {
addr: addr.0, addr,
capabilities, capabilities,
user_agent, user_agent,
flags: flags, flags: flags,
@ -132,20 +130,20 @@ impl PeerStore {
batch.commit() batch.commit()
} }
pub fn get_peer(&self, peer_addr: SocketAddr) -> Result<PeerData, Error> { pub fn get_peer(&self, peer_addr: PeerAddr) -> Result<PeerData, Error> {
option_to_not_found( option_to_not_found(
self.db.get_ser(&peer_key(peer_addr)[..]), self.db.get_ser(&peer_key(peer_addr)[..]),
&format!("Peer at address: {}", peer_addr), &format!("Peer at address: {}", peer_addr),
) )
} }
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> { pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result<bool, Error> {
self.db.exists(&peer_key(peer_addr)[..]) self.db.exists(&peer_key(peer_addr)[..])
} }
/// TODO - allow below added to avoid github issue reports /// TODO - allow below added to avoid github issue reports
#[allow(dead_code)] #[allow(dead_code)]
pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { pub fn delete_peer(&self, peer_addr: PeerAddr) -> Result<(), Error> {
let batch = self.db.batch()?; let batch = self.db.batch()?;
batch.delete(&peer_key(peer_addr)[..])?; batch.delete(&peer_key(peer_addr)[..])?;
batch.commit() batch.commit()
@ -162,17 +160,6 @@ impl PeerStore {
peers.iter().take(count).cloned().collect() peers.iter().take(count).cloned().collect()
} }
/// Query all peers with same IP address, and ignore the port
pub fn find_peers_by_ip(&self, peer_addr: SocketAddr) -> Vec<PeerData> {
self.db
.iter::<PeerData>(&to_key(
PEER_PREFIX,
&mut format!("{}", peer_addr.ip()).into_bytes(),
))
.unwrap()
.collect::<Vec<_>>()
}
/// List all known peers /// List all known peers
/// Used for /v1/peers/all api endpoint /// Used for /v1/peers/all api endpoint
pub fn all_peers(&self) -> Vec<PeerData> { pub fn all_peers(&self) -> Vec<PeerData> {
@ -182,7 +169,7 @@ impl PeerStore {
/// Convenience method to load a peer data, update its status and save it /// Convenience method to load a peer data, update its status and save it
/// back. If new state is Banned its last banned time will be updated too. /// back. If new state is Banned its last banned time will be updated too.
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> {
let batch = self.db.batch()?; let batch = self.db.batch()?;
let mut peer = option_to_not_found( let mut peer = option_to_not_found(
@ -194,7 +181,7 @@ impl PeerStore {
peer.last_banned = Utc::now().timestamp(); peer.last_banned = Utc::now().timestamp();
} }
batch.put_ser(&peer_key(peer.addr)[..], &peer)?; batch.put_ser(&peer_key(peer_addr)[..], &peer)?;
batch.commit() batch.commit()
} }
@ -226,9 +213,7 @@ impl PeerStore {
} }
} }
fn peer_key(peer_addr: SocketAddr) -> Vec<u8> { // Ignore the port unless ip is loopback address.
to_key( fn peer_key(peer_addr: PeerAddr) -> Vec<u8> {
PEER_PREFIX, to_key(PEER_PREFIX, &mut peer_addr.as_key().into_bytes())
&mut format!("{}:{}", peer_addr.ip(), peer_addr.port()).into_bytes(),
)
} }

View file

@ -16,15 +16,18 @@ use crate::util::RwLock;
use std::convert::From; use std::convert::From;
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use chrono::prelude::*; use chrono::prelude::*;
use crate::core::core;
use crate::core::core::hash::Hash; use crate::core::core::hash::Hash;
use crate::core::global;
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::core::{core, ser}; use crate::core::ser::{self, Readable, Reader, Writeable, Writer};
use grin_store; use grin_store;
/// Maximum number of block headers a peer should ever send /// Maximum number of block headers a peer should ever send
@ -95,6 +98,106 @@ impl<T> From<mpsc::TrySendError<T>> for Error {
} }
} }
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct PeerAddr(pub SocketAddr);
impl Writeable for PeerAddr {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
match self.0 {
SocketAddr::V4(sav4) => {
ser_multiwrite!(
writer,
[write_u8, 0],
[write_fixed_bytes, &sav4.ip().octets().to_vec()],
[write_u16, sav4.port()]
);
}
SocketAddr::V6(sav6) => {
writer.write_u8(1)?;
for seg in &sav6.ip().segments() {
writer.write_u16(*seg)?;
}
writer.write_u16(sav6.port())?;
}
}
Ok(())
}
}
impl Readable for PeerAddr {
fn read(reader: &mut dyn Reader) -> Result<PeerAddr, ser::Error> {
let v4_or_v6 = reader.read_u8()?;
if v4_or_v6 == 0 {
let ip = reader.read_fixed_bytes(4)?;
let port = reader.read_u16()?;
Ok(PeerAddr(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]),
port,
))))
} else {
let ip = try_iter_map_vec!(0..8, |_| reader.read_u16());
let port = reader.read_u16()?;
Ok(PeerAddr(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]),
port,
0,
0,
))))
}
}
}
impl std::hash::Hash for PeerAddr {
/// If loopback address then we care about ip and port.
/// If regular address then we only care about the ip and ignore the port.
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
if self.0.ip().is_loopback() {
self.0.hash(state);
} else {
self.0.ip().hash(state);
}
}
}
impl PartialEq for PeerAddr {
/// If loopback address then we care about ip and port.
/// If regular address then we only care about the ip and ignore the port.
fn eq(&self, other: &PeerAddr) -> bool {
if self.0.ip().is_loopback() {
self.0 == other.0
} else {
self.0.ip() == other.0.ip()
}
}
}
impl Eq for PeerAddr {}
impl std::fmt::Display for PeerAddr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl PeerAddr {
/// Convenient way of constructing a new peer_addr from an ip_addr
/// defaults to port 3414 on mainnet and 13414 on floonet.
pub fn from_ip(addr: IpAddr) -> PeerAddr {
let port = if global::is_floonet() { 13414 } else { 3414 };
PeerAddr(SocketAddr::new(addr, port))
}
/// If the ip is loopback then our key is "ip:port" (mainly for local usernet testing).
/// Otherwise we only care about the ip (we disallow multiple peers on the same ip address).
pub fn as_key(&self) -> String {
if self.0.ip().is_loopback() {
format!("{}:{}", self.0.ip(), self.0.port())
} else {
format!("{}", self.0.ip())
}
}
}
/// Configuration for the peer-to-peer server. /// Configuration for the peer-to-peer server.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct P2PConfig { pub struct P2PConfig {
@ -106,18 +209,18 @@ pub struct P2PConfig {
pub seeding_type: Seeding, pub seeding_type: Seeding,
/// The list of seed nodes, if using Seeding as a seed type /// The list of seed nodes, if using Seeding as a seed type
pub seeds: Option<Vec<String>>, pub seeds: Option<Vec<PeerAddr>>,
/// Capabilities expose by this node, also conditions which other peers this /// Capabilities expose by this node, also conditions which other peers this
/// node will have an affinity toward when connection. /// node will have an affinity toward when connection.
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub peers_allow: Option<Vec<String>>, pub peers_allow: Option<Vec<PeerAddr>>,
pub peers_deny: Option<Vec<String>>, pub peers_deny: Option<Vec<PeerAddr>>,
/// The list of preferred peers that we will try to connect to /// The list of preferred peers that we will try to connect to
pub peers_preferred: Option<Vec<String>>, pub peers_preferred: Option<Vec<PeerAddr>>,
pub ban_window: Option<i64>, pub ban_window: Option<i64>,
@ -125,7 +228,7 @@ pub struct P2PConfig {
pub peer_min_preferred_count: Option<u32>, pub peer_min_preferred_count: Option<u32>,
pub dandelion_peer: Option<SocketAddr>, pub dandelion_peer: Option<PeerAddr>,
} }
/// Default address for peer-to-peer connections. /// Default address for peer-to-peer connections.
@ -178,7 +281,7 @@ impl P2PConfig {
} }
/// Type of seeding the server will use to find other peers on the network. /// Type of seeding the server will use to find other peers on the network.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum Seeding { pub enum Seeding {
/// No seeding, mostly for tests that programmatically connect /// No seeding, mostly for tests that programmatically connect
None, None,
@ -262,7 +365,7 @@ pub struct PeerInfo {
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub user_agent: String, pub user_agent: String,
pub version: u32, pub version: u32,
pub addr: SocketAddr, pub addr: PeerAddr,
pub direction: Direction, pub direction: Direction,
pub live_info: Arc<RwLock<PeerLiveInfo>>, pub live_info: Arc<RwLock<PeerLiveInfo>>,
} }
@ -307,7 +410,7 @@ pub struct PeerInfoDisplay {
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub user_agent: String, pub user_agent: String,
pub version: u32, pub version: u32,
pub addr: SocketAddr, pub addr: PeerAddr,
pub direction: Direction, pub direction: Direction,
pub total_difficulty: Difficulty, pub total_difficulty: Difficulty,
pub height: u64, pub height: u64,
@ -353,22 +456,22 @@ pub trait ChainAdapter: Sync + Send {
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction>; fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction>;
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr); fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr);
/// A block has been received from one of our peers. Returns true if the /// A block has been received from one of our peers. Returns true if the
/// block could be handled properly and is not deemed defective by the /// block could be handled properly and is not deemed defective by the
/// chain. Returning false means the block will never be valid and /// chain. Returning false means the block will never be valid and
/// may result in the peer being banned. /// may result in the peer being banned.
fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool; fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool;
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool; fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool;
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool; fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool;
/// A set of block header has been received, typically in response to a /// A set of block header has been received, typically in response to a
/// block /// block
/// header request. /// header request.
fn headers_received(&self, bh: &[core::BlockHeader], addr: SocketAddr) -> bool; fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool;
/// Finds a list of block headers based on the provided locator. Tries to /// Finds a list of block headers based on the provided locator. Tries to
/// identify the common chain and gets the headers that follow it /// identify the common chain and gets the headers that follow it
@ -401,7 +504,7 @@ pub trait ChainAdapter: Sync + Send {
/// If we're willing to accept that new state, the data stream will be /// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be /// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes. /// rewound to the provided indexes.
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool; fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool;
} }
/// Additional methods required by the protocol that don't need to be /// Additional methods required by the protocol that don't need to be
@ -409,14 +512,14 @@ pub trait ChainAdapter: Sync + Send {
pub trait NetAdapter: ChainAdapter { pub trait NetAdapter: ChainAdapter {
/// Find good peers we know with the provided capability and return their /// Find good peers we know with the provided capability and return their
/// addresses. /// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr>; fn find_peer_addrs(&self, capab: Capabilities) -> Vec<PeerAddr>;
/// A list of peers has been received from one of our peers. /// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, _: Vec<SocketAddr>); fn peer_addrs_received(&self, _: Vec<PeerAddr>);
/// Heard total_difficulty from a connected peer (via ping/pong). /// Heard total_difficulty from a connected peer (via ping/pong).
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _: u64); fn peer_difficulty(&self, _: PeerAddr, _: Difficulty, _: u64);
/// Is this peer currently banned? /// Is this peer currently banned?
fn is_banned(&self, addr: SocketAddr) -> bool; fn is_banned(&self, addr: PeerAddr) -> bool;
} }

View file

@ -25,6 +25,7 @@ use std::{thread, time};
use crate::core::core::hash::Hash; use crate::core::core::hash::Hash;
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::p2p::types::PeerAddr;
use crate::p2p::Peer; use crate::p2p::Peer;
fn open_port() -> u16 { fn open_port() -> u16 {
@ -70,7 +71,7 @@ fn peer_handshake() {
let addr = SocketAddr::new(p2p_config.host, p2p_config.port); let addr = SocketAddr::new(p2p_config.host, p2p_config.port);
let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap(); let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();
let my_addr = "127.0.0.1:5000".parse().unwrap(); let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap());
let mut peer = Peer::connect( let mut peer = Peer::connect(
&mut socket, &mut socket,
p2p::Capabilities::UNKNOWN, p2p::Capabilities::UNKNOWN,
@ -89,7 +90,7 @@ fn peer_handshake() {
peer.send_ping(Difficulty::min(), 0).unwrap(); peer.send_ping(Difficulty::min(), 0).unwrap();
thread::sleep(time::Duration::from_secs(1)); thread::sleep(time::Duration::from_secs(1));
let server_peer = server.peers.get_connected_peer(&my_addr).unwrap(); let server_peer = server.peers.get_connected_peer(my_addr).unwrap();
assert_eq!(server_peer.info.total_difficulty(), Difficulty::min()); assert_eq!(server_peer.info.total_difficulty(), Difficulty::min());
assert!(server.peers.peer_count() > 0); assert!(server.peers.peer_count() > 0);
} }

View file

@ -17,7 +17,6 @@
use crate::util::RwLock; use crate::util::RwLock;
use std::fs::File; use std::fs::File;
use std::net::SocketAddr;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::thread; use std::thread;
use std::time::Instant; use std::time::Instant;
@ -31,6 +30,7 @@ use crate::core::core::{BlockHeader, BlockSums, CompactBlock};
use crate::core::pow::Difficulty; use crate::core::pow::Difficulty;
use crate::core::{core, global}; use crate::core::{core, global};
use crate::p2p; use crate::p2p;
use crate::p2p::types::PeerAddr;
use crate::pool; use crate::pool;
use crate::util::OneTime; use crate::util::OneTime;
use chrono::prelude::*; use chrono::prelude::*;
@ -62,7 +62,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash) self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash)
} }
fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
// nothing much we can do with a new transaction while syncing // nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() { if self.sync_state.is_syncing() {
return; return;
@ -71,7 +71,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash); let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash);
if tx.is_none() { if tx.is_none() {
self.request_transaction(kernel_hash, &addr); self.request_transaction(kernel_hash, addr);
} }
} }
@ -107,7 +107,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} }
} }
fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool { fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool {
debug!( debug!(
"Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.",
b.hash(), b.hash(),
@ -120,7 +120,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
self.process_block(b, addr, was_requested) self.process_block(b, addr, was_requested)
} }
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
let bhash = cb.hash(); let bhash = cb.hash();
debug!( debug!(
"Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.", "Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.",
@ -187,7 +187,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} else { } else {
if self.sync_state.status() == SyncStatus::NoSync { if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block"); debug!("adapter: block invalid after hydration, requesting full block");
self.request_block(&cb.header, &addr); self.request_block(&cb.header, addr);
true true
} else { } else {
debug!("block invalid after hydration, ignoring it, cause still syncing"); debug!("block invalid after hydration, ignoring it, cause still syncing");
@ -201,7 +201,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
} }
} }
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
let bhash = bh.hash(); let bhash = bh.hash();
debug!( debug!(
"Received block header {} at {} from {}, going to process.", "Received block header {} at {} from {}, going to process.",
@ -227,13 +227,13 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// we have successfully processed a block header // we have successfully processed a block header
// so we can go request the block itself // so we can go request the block itself
self.request_compact_block(&bh, &addr); self.request_compact_block(&bh, addr);
// done receiving the header // done receiving the header
true true
} }
fn headers_received(&self, bhs: &[core::BlockHeader], addr: SocketAddr) -> bool { fn headers_received(&self, bhs: &[core::BlockHeader], addr: PeerAddr) -> bool {
info!("Received {} block headers from {}", bhs.len(), addr,); info!("Received {} block headers from {}", bhs.len(), addr,);
if bhs.len() == 0 { if bhs.len() == 0 {
@ -342,7 +342,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
/// If we're willing to accept that new state, the data stream will be /// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be /// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes. /// rewound to the provided indexes.
fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: SocketAddr) -> bool { fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: PeerAddr) -> bool {
// check status again after download, in case 2 txhashsets made it somehow // check status again after download, in case 2 txhashsets made it somehow
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else { } else {
@ -421,7 +421,7 @@ impl NetToChainAdapter {
// pushing the new block through the chain pipeline // pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block // remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool { fn process_block(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool {
// We cannot process blocks earlier than the horizon so check for this here. // We cannot process blocks earlier than the horizon so check for this here.
{ {
let head = self.chain().head().unwrap(); let head = self.chain().head().unwrap();
@ -458,7 +458,7 @@ impl NetToChainAdapter {
&& !self.sync_state.is_syncing() && !self.sync_state.is_syncing()
{ {
debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash()); debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash());
self.request_block_by_hash(previous.hash(), &addr) self.request_block_by_hash(previous.hash(), addr)
} }
} }
true true
@ -525,7 +525,7 @@ impl NetToChainAdapter {
} }
} }
fn request_transaction(&self, h: Hash, addr: &SocketAddr) { fn request_transaction(&self, h: Hash, addr: PeerAddr) {
self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h)) self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h))
} }
@ -533,24 +533,24 @@ impl NetToChainAdapter {
// it into a full block then fallback to requesting the full block // it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block // from the same peer that gave us the compact block
// consider additional peers for redundancy? // consider additional peers for redundancy?
fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) { fn request_block(&self, bh: &BlockHeader, addr: PeerAddr) {
self.request_block_by_hash(bh.hash(), addr) self.request_block_by_hash(bh.hash(), addr)
} }
fn request_block_by_hash(&self, h: Hash, addr: &SocketAddr) { fn request_block_by_hash(&self, h: Hash, addr: PeerAddr) {
self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h)) self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h))
} }
// After we have received a block header in "header first" propagation // After we have received a block header in "header first" propagation
// we need to go request the block (compact representation) from the // we need to go request the block (compact representation) from the
// same peer that gave us the header (unless we have already accepted the block) // same peer that gave us the header (unless we have already accepted the block)
fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) { fn request_compact_block(&self, bh: &BlockHeader, addr: PeerAddr) {
self.send_block_request_to_peer(bh.hash(), addr, |peer, h| { self.send_block_request_to_peer(bh.hash(), addr, |peer, h| {
peer.send_compact_block_request(h) peer.send_compact_block_request(h)
}) })
} }
fn send_tx_request_to_peer<F>(&self, h: Hash, addr: &SocketAddr, f: F) fn send_tx_request_to_peer<F>(&self, h: Hash, addr: PeerAddr, f: F)
where where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{ {
@ -567,7 +567,7 @@ impl NetToChainAdapter {
} }
} }
fn send_block_request_to_peer<F>(&self, h: Hash, addr: &SocketAddr, f: F) fn send_block_request_to_peer<F>(&self, h: Hash, addr: PeerAddr, f: F)
where where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{ {

View file

@ -21,12 +21,13 @@ use chrono::prelude::{DateTime, Utc};
use chrono::{Duration, MIN_DATE}; use chrono::{Duration, MIN_DATE};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::ToSocketAddrs;
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::{cmp, str, thread, time}; use std::{cmp, str, thread, time};
use crate::core::global; use crate::core::global;
use crate::p2p; use crate::p2p;
use crate::p2p::types::PeerAddr;
use crate::p2p::ChainAdapter; use crate::p2p::ChainAdapter;
use crate::pool::DandelionConfig; use crate::pool::DandelionConfig;
use crate::util::{Mutex, StopState}; use crate::util::{Mutex, StopState};
@ -52,8 +53,8 @@ pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>, p2p_server: Arc<p2p::Server>,
capabilities: p2p::Capabilities, capabilities: p2p::Capabilities,
dandelion_config: DandelionConfig, dandelion_config: DandelionConfig,
seed_list: Box<dyn Fn() -> Vec<SocketAddr> + Send>, seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
preferred_peers: Option<Vec<SocketAddr>>, preferred_peers: Option<Vec<PeerAddr>>,
stop_state: Arc<Mutex<StopState>>, stop_state: Arc<Mutex<StopState>>,
) { ) {
let _ = thread::Builder::new() let _ = thread::Builder::new()
@ -78,7 +79,7 @@ pub fn connect_and_monitor(
let mut prev_ping = Utc::now(); let mut prev_ping = Utc::now();
let mut start_attempt = 0; let mut start_attempt = 0;
let mut connecting_history: HashMap<SocketAddr, DateTime<Utc>> = HashMap::new(); let mut connecting_history: HashMap<PeerAddr, DateTime<Utc>> = HashMap::new();
loop { loop {
if stop_state.lock().is_stopped() { if stop_state.lock().is_stopped() {
@ -140,8 +141,8 @@ pub fn connect_and_monitor(
fn monitor_peers( fn monitor_peers(
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
config: p2p::P2PConfig, config: p2p::P2PConfig,
tx: mpsc::Sender<SocketAddr>, tx: mpsc::Sender<PeerAddr>,
preferred_peers_list: Option<Vec<SocketAddr>>, preferred_peers_list: Option<Vec<PeerAddr>>,
) { ) {
// regularly check if we need to acquire more peers and if so, gets // regularly check if we need to acquire more peers and if so, gets
// them from db // them from db
@ -156,7 +157,7 @@ fn monitor_peers(
let interval = Utc::now().timestamp() - x.last_banned; let interval = Utc::now().timestamp() - x.last_banned;
// Unban peer // Unban peer
if interval >= config.ban_window() { if interval >= config.ban_window() {
peers.unban_peer(&x.addr); peers.unban_peer(x.addr);
debug!( debug!(
"monitor_peers: unbanned {} after {} seconds", "monitor_peers: unbanned {} after {} seconds",
x.addr, interval x.addr, interval
@ -192,7 +193,7 @@ fn monitor_peers(
// loop over connected peers // loop over connected peers
// ask them for their list of peers // ask them for their list of peers
let mut connected_peers: Vec<SocketAddr> = vec![]; let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers.connected_peers() { for p in peers.connected_peers() {
trace!( trace!(
"monitor_peers: {}:{} ask {} for more peers", "monitor_peers: {}:{} ask {} for more peers",
@ -205,19 +206,16 @@ fn monitor_peers(
} }
// Attempt to connect to preferred peers if there is some // Attempt to connect to preferred peers if there is some
match preferred_peers_list { if let Some(preferred_peers) = preferred_peers_list {
Some(preferred_peers) => { for p in preferred_peers {
for p in preferred_peers { if !connected_peers.is_empty() {
if !connected_peers.is_empty() { if !connected_peers.contains(&p) {
if !connected_peers.contains(&p) {
tx.send(p).unwrap();
}
} else {
tx.send(p).unwrap(); tx.send(p).unwrap();
} }
} else {
tx.send(p).unwrap();
} }
} }
None => debug!("monitor_peers: no preferred peers"),
} }
// take a random defunct peer and mark it healthy: over a long period any // take a random defunct peer and mark it healthy: over a long period any
@ -235,7 +233,7 @@ fn monitor_peers(
config.peer_max_count() as usize, config.peer_max_count() as usize,
); );
for p in new_peers.iter().filter(|p| !peers.is_known(&p.addr)) { for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) {
trace!( trace!(
"monitor_peers: on {}:{}, queue to soon try {}", "monitor_peers: on {}:{}, queue to soon try {}",
config.host, config.host,
@ -265,9 +263,9 @@ fn update_dandelion_relay(peers: Arc<p2p::Peers>, dandelion_config: DandelionCon
// otherwise use the seeds provided. // otherwise use the seeds provided.
fn connect_to_seeds_and_preferred_peers( fn connect_to_seeds_and_preferred_peers(
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
tx: mpsc::Sender<SocketAddr>, tx: mpsc::Sender<PeerAddr>,
seed_list: Box<dyn Fn() -> Vec<SocketAddr>>, seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
peers_preferred_list: Option<Vec<SocketAddr>>, peers_preferred_list: Option<Vec<PeerAddr>>,
) { ) {
// check if we have some peers in db // check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability) // look for peers that are able to give us other peers (via PEER_LIST capability)
@ -303,14 +301,14 @@ fn listen_for_addrs(
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
p2p: Arc<p2p::Server>, p2p: Arc<p2p::Server>,
capab: p2p::Capabilities, capab: p2p::Capabilities,
rx: &mpsc::Receiver<SocketAddr>, rx: &mpsc::Receiver<PeerAddr>,
connecting_history: &mut HashMap<SocketAddr, DateTime<Utc>>, connecting_history: &mut HashMap<PeerAddr, DateTime<Utc>>,
) { ) {
// Pull everything currently on the queue off the queue. // Pull everything currently on the queue off the queue.
// Does not block so addrs may be empty. // Does not block so addrs may be empty.
// We will take(max_peers) from this later but we want to drain the rx queue // We will take(max_peers) from this later but we want to drain the rx queue
// here to prevent it backing up. // here to prevent it backing up.
let addrs: Vec<SocketAddr> = rx.try_iter().collect(); let addrs: Vec<PeerAddr> = rx.try_iter().collect();
// If we have a healthy number of outbound peers then we are done here. // If we have a healthy number of outbound peers then we are done here.
if peers.healthy_peers_mix() { if peers.healthy_peers_mix() {
@ -342,7 +340,7 @@ fn listen_for_addrs(
let p2p_c = p2p.clone(); let p2p_c = p2p.clone();
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("peer_connect".to_string()) .name("peer_connect".to_string())
.spawn(move || match p2p_c.connect(&addr) { .spawn(move || match p2p_c.connect(addr) {
Ok(p) => { Ok(p) => {
let _ = p.send_peer_request(capab); let _ = p.send_peer_request(capab);
let _ = peers_c.update_state(addr, p2p::State::Healthy); let _ = peers_c.update_state(addr, p2p::State::Healthy);
@ -368,9 +366,9 @@ fn listen_for_addrs(
} }
} }
pub fn dns_seeds() -> Box<dyn Fn() -> Vec<SocketAddr> + Send> { pub fn dns_seeds() -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(|| { Box::new(|| {
let mut addresses: Vec<SocketAddr> = vec![]; let mut addresses: Vec<PeerAddr> = vec![];
let net_seeds = if global::is_floonet() { let net_seeds = if global::is_floonet() {
FLOONET_DNS_SEEDS FLOONET_DNS_SEEDS
} else { } else {
@ -384,7 +382,7 @@ pub fn dns_seeds() -> Box<dyn Fn() -> Vec<SocketAddr> + Send> {
&mut (addrs &mut (addrs
.map(|mut addr| { .map(|mut addr| {
addr.set_port(if global::is_floonet() { 13414 } else { 3414 }); addr.set_port(if global::is_floonet() { 13414 } else { 3414 });
addr PeerAddr(addr)
}) })
.filter(|addr| !temp_addresses.contains(addr)) .filter(|addr| !temp_addresses.contains(addr))
.collect()), .collect()),
@ -399,26 +397,6 @@ pub fn dns_seeds() -> Box<dyn Fn() -> Vec<SocketAddr> + Send> {
/// Convenience function when the seed list is immediately known. Mostly used /// Convenience function when the seed list is immediately known. Mostly used
/// for tests. /// for tests.
pub fn predefined_seeds(addrs_str: Vec<String>) -> Box<dyn Fn() -> Vec<SocketAddr> + Send> { pub fn predefined_seeds(addrs: Vec<PeerAddr>) -> Box<dyn Fn() -> Vec<PeerAddr> + Send> {
Box::new(move || { Box::new(move || addrs.clone())
addrs_str
.iter()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>()
})
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn preferred_peers(addrs_str: Vec<String>) -> Option<Vec<SocketAddr>> {
if addrs_str.is_empty() {
None
} else {
Some(
addrs_str
.iter()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>(),
)
}
} }

View file

@ -16,7 +16,6 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts //! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade. //! as a facade.
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
@ -35,6 +34,7 @@ use crate::grin::{dandelion_monitor, seed, sync};
use crate::mining::stratumserver; use crate::mining::stratumserver;
use crate::mining::test_miner::Miner; use crate::mining::test_miner::Miner;
use crate::p2p; use crate::p2p;
use crate::p2p::types::PeerAddr;
use crate::pool; use crate::pool;
use crate::store; use crate::store;
use crate::util::file::get_first_line; use crate::util::file::get_first_line;
@ -103,7 +103,7 @@ impl Server {
} }
/// Instantiates a new server associated with the provided future reactor. /// Instantiates a new server associated with the provided future reactor.
pub fn new(mut config: ServerConfig) -> Result<Server, Error> { pub fn new(config: ServerConfig) -> Result<Server, Error> {
// Defaults to None (optional) in config file. // Defaults to None (optional) in config file.
// This translates to false here. // This translates to false here.
let archive_mode = match config.archive_mode { let archive_mode = match config.archive_mode {
@ -178,30 +178,25 @@ impl Server {
pool_net_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone());
net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone());
if config.p2p_config.seeding_type.clone() != p2p::Seeding::Programmatic { if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
let seeder = match config.p2p_config.seeding_type.clone() { let seeder = match config.p2p_config.seeding_type {
p2p::Seeding::None => { p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to"); warn!("No seed configured, will stay solo until connected to");
seed::predefined_seeds(vec![]) seed::predefined_seeds(vec![])
} }
p2p::Seeding::List => { p2p::Seeding::List => {
seed::predefined_seeds(config.p2p_config.seeds.as_mut().unwrap().clone()) seed::predefined_seeds(config.p2p_config.seeds.clone().unwrap())
} }
p2p::Seeding::DNSSeed => seed::dns_seeds(), p2p::Seeding::DNSSeed => seed::dns_seeds(),
_ => unreachable!(), _ => unreachable!(),
}; };
let peers_preferred = match config.p2p_config.peers_preferred.clone() {
Some(peers_preferred) => seed::preferred_peers(peers_preferred),
None => None,
};
seed::connect_and_monitor( seed::connect_and_monitor(
p2p_server.clone(), p2p_server.clone(),
config.p2p_config.capabilities, config.p2p_config.capabilities,
config.dandelion_config.clone(), config.dandelion_config.clone(),
seeder, seeder,
peers_preferred, config.p2p_config.peers_preferred.clone(),
stop_state.clone(), stop_state.clone(),
); );
} }
@ -273,8 +268,8 @@ impl Server {
} }
/// Asks the server to connect to a peer at the provided network address. /// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> { pub fn connect_peer(&self, addr: PeerAddr) -> Result<(), Error> {
self.p2p.connect(&addr)?; self.p2p.connect(addr)?;
Ok(()) Ok(())
} }

View file

@ -148,7 +148,7 @@ impl HeaderSync {
&& highest_height == peer.info.height() && highest_height == peer.info.height()
{ {
self.peers self.peers
.ban_peer(&peer.info.addr, ReasonForBan::FraudHeight); .ban_peer(peer.info.addr, ReasonForBan::FraudHeight);
info!( info!(
"sync: ban a fraud peer: {}, claimed height: {}, total difficulty: {}", "sync: ban a fraud peer: {}, claimed height: {}, total difficulty: {}",
peer.info.addr, peer.info.addr,

View file

@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
use self::keychain::Keychain; use self::keychain::Keychain;
use self::p2p::PeerAddr;
use self::util::Mutex; use self::util::Mutex;
use self::wallet::{HTTPNodeClient, HTTPWalletCommAdapter, LMDBBackend, WalletConfig}; use self::wallet::{HTTPNodeClient, HTTPWalletCommAdapter, LMDBBackend, WalletConfig};
use blake2_rfc as blake2; use blake2_rfc as blake2;
@ -191,7 +192,7 @@ impl LocalServerContainer {
if self.config.seed_addr.len() > 0 { if self.config.seed_addr.len() > 0 {
seeding_type = p2p::Seeding::List; seeding_type = p2p::Seeding::List;
seeds = vec![self.config.seed_addr.to_string()]; seeds = vec![PeerAddr(self.config.seed_addr.parse().unwrap())];
} }
let s = servers::Server::new(servers::ServerConfig { let s = servers::Server::new(servers::ServerConfig {
@ -233,9 +234,9 @@ impl LocalServerContainer {
s.start_test_miner(wallet_url, s.stop_state.clone()); s.start_test_miner(wallet_url, s.stop_state.clone());
} }
for p in &mut self.peer_list { for p in &self.peer_list {
println!("{} connecting to peer: {}", self.config.p2p_server_port, p); println!("{} connecting to peer: {}", self.config.p2p_server_port, p);
let _ = s.connect_peer(p.parse().unwrap()); let _ = s.connect_peer(PeerAddr(p.parse().unwrap()));
} }
if self.wallet_is_running { if self.wallet_is_running {
@ -647,7 +648,9 @@ pub fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig
p2p_config: p2p::P2PConfig { p2p_config: p2p::P2PConfig {
port: 10000 + n, port: 10000 + n,
seeding_type: p2p::Seeding::List, seeding_type: p2p::Seeding::List,
seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]), seeds: Some(vec![PeerAddr(
format!("127.0.0.1:{}", 10000 + seed_n).parse().unwrap(),
)]),
..p2p::P2PConfig::default() ..p2p::P2PConfig::default()
}, },
chain_type: core::global::ChainTypes::AutomatedTesting, chain_type: core::global::ChainTypes::AutomatedTesting,

View file

@ -19,6 +19,7 @@ mod framework;
use self::core::core::hash::Hashed; use self::core::core::hash::Hashed;
use self::core::global::{self, ChainTypes}; use self::core::global::{self, ChainTypes};
use self::p2p::PeerAddr;
use self::util::{Mutex, StopState}; use self::util::{Mutex, StopState};
use self::wallet::controller; use self::wallet::controller;
use self::wallet::libwallet::types::{WalletBackend, WalletInst}; use self::wallet::libwallet::types::{WalletBackend, WalletInst};
@ -933,7 +934,9 @@ fn replicate_tx_fluff_failure() {
// Server 2 (another node) // Server 2 (another node)
let mut s2_config = framework::config(3001, "tx_fluff", 3001); let mut s2_config = framework::config(3001, "tx_fluff", 3001);
s2_config.p2p_config.seeds = Some(vec!["127.0.0.1:13000".to_owned()]); s2_config.p2p_config.seeds = Some(vec![PeerAddr(
"127.0.0.1:13000".to_owned().parse().unwrap(),
)]);
s2_config.dandelion_config.embargo_secs = Some(10); s2_config.dandelion_config.embargo_secs = Some(10);
s2_config.dandelion_config.patience_secs = Some(1); s2_config.dandelion_config.patience_secs = Some(1);
s2_config.dandelion_config.relay_secs = Some(1); s2_config.dandelion_config.relay_secs = Some(1);
@ -944,7 +947,9 @@ fn replicate_tx_fluff_failure() {
for i in 0..dl_nodes { for i in 0..dl_nodes {
// (create some stem nodes) // (create some stem nodes)
let mut s_config = framework::config(3002 + i, "tx_fluff", 3002 + i); let mut s_config = framework::config(3002 + i, "tx_fluff", 3002 + i);
s_config.p2p_config.seeds = Some(vec!["127.0.0.1:13000".to_owned()]); s_config.p2p_config.seeds = Some(vec![PeerAddr(
"127.0.0.1:13000".to_owned().parse().unwrap(),
)]);
s_config.dandelion_config.embargo_secs = Some(10); s_config.dandelion_config.embargo_secs = Some(10);
s_config.dandelion_config.patience_secs = Some(1); s_config.dandelion_config.patience_secs = Some(1);
s_config.dandelion_config.relay_secs = Some(1); s_config.dandelion_config.relay_secs = Some(1);

View file

@ -24,7 +24,7 @@ use ctrlc;
use crate::config::GlobalConfig; use crate::config::GlobalConfig;
use crate::core::global; use crate::core::global;
use crate::p2p::Seeding; use crate::p2p::{PeerAddr, Seeding};
use crate::servers; use crate::servers;
use crate::tui::ui; use crate::tui::ui;
@ -116,45 +116,15 @@ pub fn server_command(
} }
if let Some(seeds) = a.values_of("seed") { if let Some(seeds) = a.values_of("seed") {
let seed_addrs = seeds
.filter_map(|x| x.parse().ok())
.map(|x| PeerAddr(x))
.collect();
server_config.p2p_config.seeding_type = Seeding::List; server_config.p2p_config.seeding_type = Seeding::List;
server_config.p2p_config.seeds = Some(seeds.map(|s| s.to_string()).collect()); server_config.p2p_config.seeds = Some(seed_addrs);
} }
} }
/*if let Some(true) = server_config.run_wallet_listener {
let mut wallet_config = global_config.members.as_ref().unwrap().wallet.clone();
wallet::init_wallet_seed(wallet_config.clone());
let wallet = wallet::instantiate_wallet(wallet_config.clone(), "");
let _ = thread::Builder::new()
.name("wallet_listener".to_string())
.spawn(move || {
controller::foreign_listener(wallet, &wallet_config.api_listen_addr())
.unwrap_or_else(|e| {
panic!(
"Error creating wallet listener: {:?} Config: {:?}",
e, wallet_config
)
});
});
}
if let Some(true) = server_config.run_wallet_owner_api {
let mut wallet_config = global_config.members.unwrap().wallet;
let wallet = wallet::instantiate_wallet(wallet_config.clone(), "");
wallet::init_wallet_seed(wallet_config.clone());
let _ = thread::Builder::new()
.name("wallet_owner_listener".to_string())
.spawn(move || {
controller::owner_listener(wallet, "127.0.0.1:13420").unwrap_or_else(|e| {
panic!(
"Error creating wallet api listener: {:?} Config: {:?}",
e, wallet_config
)
});
});
}*/
if let Some(a) = server_args { if let Some(a) = server_args {
match a.subcommand() { match a.subcommand() {
("run", _) => { ("run", _) => {