diff --git a/api/src/handlers/peers_api.rs b/api/src/handlers/peers_api.rs index 692245dbc..6453432fb 100644 --- a/api/src/handlers/peers_api.rs +++ b/api/src/handlers/peers_api.rs @@ -14,7 +14,7 @@ use super::utils::w; use crate::p2p; -use crate::p2p::types::{PeerInfoDisplay, ReasonForBan}; +use crate::p2p::types::{PeerAddr, PeerInfoDisplay, ReasonForBan}; use crate::router::{Handler, ResponseFuture}; use crate::web::*; use hyper::{Body, Request, StatusCode}; @@ -57,16 +57,25 @@ pub struct PeerHandler { impl Handler for PeerHandler { fn get(&self, req: Request) -> ResponseFuture { let command = right_path_element!(req); - if let Ok(addr) = command.parse() { - match w(&self.peers).get_peer(addr) { - Ok(peer) => json_response(&peer), - Err(_) => response(StatusCode::NOT_FOUND, "peer not found"), - } + + // We support both "ip" and "ip:port" here for peer_addr. + // "ip:port" is only really useful for local usernet testing on loopback address. + // Normally we map peers to ip and only allow a single peer per ip address. + let peer_addr; + if let Ok(ip_addr) = command.parse() { + peer_addr = PeerAddr::from_ip(ip_addr); + } else if let Ok(addr) = command.parse() { + peer_addr = PeerAddr(addr); } else { - response( + return response( StatusCode::BAD_REQUEST, format!("peer address unrecognized: {}", req.uri().path()), - ) + ); + } + + match w(&self.peers).get_peer(peer_addr) { + Ok(peer) => json_response(&peer), + Err(_) => response(StatusCode::NOT_FOUND, "peer not found"), } } fn post(&self, req: Request) -> ResponseFuture { @@ -77,20 +86,23 @@ impl Handler for PeerHandler { }; let addr = match path_elems.next() { None => return response(StatusCode::BAD_REQUEST, "invalid url"), - Some(a) => match a.parse() { - Err(e) => { + Some(a) => { + if let Ok(ip_addr) = a.parse() { + PeerAddr::from_ip(ip_addr) + } else if let Ok(addr) = a.parse() { + PeerAddr(addr) + } else { return response( StatusCode::BAD_REQUEST, - format!("invalid peer address: {}", e), + format!("invalid peer address: {}", req.uri().path()), ); } - Ok(addr) => addr, - }, + } }; match command { - "ban" => w(&self.peers).ban_peer(&addr, ReasonForBan::ManualBan), - "unban" => w(&self.peers).unban_peer(&addr), + "ban" => w(&self.peers).ban_peer(addr, ReasonForBan::ManualBan), + "unban" => w(&self.peers).unban_peer(addr), _ => return response(StatusCode::BAD_REQUEST, "invalid command"), }; diff --git a/p2p/fuzz/fuzz_targets/read_sock_addr.rs b/p2p/fuzz/fuzz_targets/read_peer_addr.rs similarity index 65% rename from p2p/fuzz/fuzz_targets/read_sock_addr.rs rename to p2p/fuzz/fuzz_targets/read_peer_addr.rs index cbe9a26ea..723634a1a 100644 --- a/p2p/fuzz/fuzz_targets/read_sock_addr.rs +++ b/p2p/fuzz/fuzz_targets/read_peer_addr.rs @@ -5,9 +5,9 @@ extern crate grin_core; extern crate grin_p2p; use grin_core::ser; -use grin_p2p::msg::SockAddr; +use grin_p2p::types::PeerAddr; fuzz_target!(|data: &[u8]| { let mut d = data.clone(); - let _t: Result = ser::deserialize(&mut d); + let _t: Result = ser::deserialize(&mut d); }); diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index cd0ea197a..dbb1b14da 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -22,11 +22,9 @@ use rand::{thread_rng, Rng}; use crate::core::core::hash::Hash; use crate::core::pow::Difficulty; -use crate::msg::{ - read_message, write_message, Hand, Shake, SockAddr, Type, PROTOCOL_VERSION, USER_AGENT, -}; +use crate::msg::{read_message, write_message, Hand, Shake, Type, PROTOCOL_VERSION, USER_AGENT}; use crate::peer::Peer; -use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerInfo, PeerLiveInfo}; +use crate::types::{Capabilities, Direction, Error, P2PConfig, PeerAddr, PeerInfo, PeerLiveInfo}; /// Local generated nonce for peer connecting. /// Used for self-connecting detection (on receiver side), @@ -44,7 +42,7 @@ pub struct Handshake { /// a node id. nonces: Arc>>, /// Ring buffer of self addr(s) collected from PeerWithSelf detection (by nonce). - pub addrs: Arc>>, + pub addrs: Arc>>, /// The genesis block header of the chain seen by this node. /// We only want to connect to other nodes seeing the same chain (forks are /// ok). @@ -67,13 +65,13 @@ impl Handshake { &self, capab: Capabilities, total_difficulty: Difficulty, - self_addr: SocketAddr, + self_addr: PeerAddr, conn: &mut TcpStream, ) -> Result { // prepare the first part of the handshake let nonce = self.next_nonce(); let peer_addr = match conn.peer_addr() { - Ok(pa) => pa, + Ok(pa) => PeerAddr(pa), Err(e) => return Err(Error::Connection(e)), }; @@ -83,8 +81,8 @@ impl Handshake { nonce: nonce, genesis: self.genesis, total_difficulty: total_difficulty, - sender_addr: SockAddr(self_addr), - receiver_addr: SockAddr(peer_addr), + sender_addr: self_addr, + receiver_addr: peer_addr, user_agent: USER_AGENT.to_string(), }; @@ -118,7 +116,7 @@ impl Handshake { // If denied then we want to close the connection // (without providing our peer with any details why). - if Peer::is_denied(&self.config, &peer_info.addr) { + if Peer::is_denied(&self.config, peer_info.addr) { return Err(Error::ConnectionClose); } @@ -155,7 +153,7 @@ impl Handshake { } else { // check the nonce to see if we are trying to connect to ourselves let nonces = self.nonces.read(); - let addr = extract_ip(&hand.sender_addr.0, &conn); + let addr = resolve_peer_addr(hand.sender_addr, &conn); if nonces.contains(&hand.nonce) { // save ip addresses of ourselves let mut addrs = self.addrs.write(); @@ -171,7 +169,7 @@ impl Handshake { let peer_info = PeerInfo { capabilities: hand.capabilities, user_agent: hand.user_agent, - addr: extract_ip(&hand.sender_addr.0, &conn), + addr: resolve_peer_addr(hand.sender_addr, &conn), version: hand.version, live_info: Arc::new(RwLock::new(PeerLiveInfo { total_difficulty: hand.total_difficulty, @@ -186,7 +184,7 @@ impl Handshake { // so check if we are configured to explicitly allow or deny it. // If denied then we want to close the connection // (without providing our peer with any details why). - if Peer::is_denied(&self.config, &peer_info.addr) { + if Peer::is_denied(&self.config, peer_info.addr) { return Err(Error::ConnectionClose); } @@ -219,28 +217,12 @@ impl Handshake { } } -// Attempts to make a best guess at the correct remote IP by checking if the -// advertised address is the loopback and our TCP connection. Note that the -// port reported by the connection is always incorrect for receiving -// connections as it's dynamically allocated by the server. -fn extract_ip(advertised: &SocketAddr, conn: &TcpStream) -> SocketAddr { - match advertised { - &SocketAddr::V4(v4sock) => { - let ip = v4sock.ip(); - if ip.is_loopback() || ip.is_unspecified() { - if let Ok(addr) = conn.peer_addr() { - return SocketAddr::new(addr.ip(), advertised.port()); - } - } - } - &SocketAddr::V6(v6sock) => { - let ip = v6sock.ip(); - if ip.is_loopback() || ip.is_unspecified() { - if let Ok(addr) = conn.peer_addr() { - return SocketAddr::new(addr.ip(), advertised.port()); - } - } - } +/// Resolve the correct peer_addr based on the connection and the advertised port. +fn resolve_peer_addr(advertised: PeerAddr, conn: &TcpStream) -> PeerAddr { + let port = advertised.0.port(); + if let Ok(addr) = conn.peer_addr() { + PeerAddr(SocketAddr::new(addr.ip(), port)) + } else { + advertised } - advertised.clone() } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 4f967b1fb..3ba530201 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -52,6 +52,6 @@ pub use crate::peers::Peers; pub use crate::serv::{DummyAdapter, Server}; pub use crate::store::{PeerData, State}; pub use crate::types::{ - Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerInfo, ReasonForBan, Seeding, - TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, + Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, + Seeding, TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, }; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 3f9a6d733..8e7b5d151 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -16,7 +16,6 @@ use num::FromPrimitive; use std::io::{Read, Write}; -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::time; use crate::core::core::hash::Hash; @@ -25,7 +24,7 @@ use crate::core::pow::Difficulty; use crate::core::ser::{self, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer}; use crate::core::{consensus, global}; use crate::types::{ - Capabilities, Error, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, + Capabilities, Error, PeerAddr, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, }; use crate::util::read_write::read_exact; @@ -254,9 +253,9 @@ pub struct Hand { /// may be needed pub total_difficulty: Difficulty, /// network address of the sender - pub sender_addr: SockAddr, + pub sender_addr: PeerAddr, /// network address of the receiver - pub receiver_addr: SockAddr, + pub receiver_addr: PeerAddr, /// name of version of the software pub user_agent: String, } @@ -283,8 +282,8 @@ impl Readable for Hand { let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64); let capabilities = Capabilities::from_bits_truncate(capab); let total_diff = Difficulty::read(reader)?; - let sender_addr = SockAddr::read(reader)?; - let receiver_addr = SockAddr::read(reader)?; + let sender_addr = PeerAddr::read(reader)?; + let receiver_addr = PeerAddr::read(reader)?; let ua = reader.read_bytes_len_prefix()?; let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let genesis = Hash::read(reader)?; @@ -373,7 +372,7 @@ impl Readable for GetPeerAddrs { /// GetPeerAddrs. #[derive(Debug)] pub struct PeerAddrs { - pub peers: Vec, + pub peers: Vec, } impl Writeable for PeerAddrs { @@ -394,10 +393,9 @@ impl Readable for PeerAddrs { } else if peer_count == 0 { return Ok(PeerAddrs { peers: vec![] }); } - // let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader)); let mut peers = Vec::with_capacity(peer_count as usize); for _ in 0..peer_count { - peers.push(SockAddr::read(reader)?); + peers.push(PeerAddr::read(reader)?); } Ok(PeerAddrs { peers: peers }) } @@ -431,58 +429,6 @@ impl Readable for PeerError { } } -/// Only necessary so we can implement Readable and Writeable. Rust disallows -/// implementing traits when both types are outside of this crate (which is the -/// case for SocketAddr and Readable/Writeable). -#[derive(Debug)] -pub struct SockAddr(pub SocketAddr); - -impl Writeable for SockAddr { - fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - match self.0 { - SocketAddr::V4(sav4) => { - ser_multiwrite!( - writer, - [write_u8, 0], - [write_fixed_bytes, &sav4.ip().octets().to_vec()], - [write_u16, sav4.port()] - ); - } - SocketAddr::V6(sav6) => { - writer.write_u8(1)?; - for seg in &sav6.ip().segments() { - writer.write_u16(*seg)?; - } - writer.write_u16(sav6.port())?; - } - } - Ok(()) - } -} - -impl Readable for SockAddr { - fn read(reader: &mut dyn Reader) -> Result { - let v4_or_v6 = reader.read_u8()?; - if v4_or_v6 == 0 { - let ip = reader.read_fixed_bytes(4)?; - let port = reader.read_u16()?; - Ok(SockAddr(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), - port, - )))) - } else { - let ip = try_iter_map_vec!(0..8, |_| reader.read_u16()); - let port = reader.read_u16()?; - Ok(SockAddr(SocketAddr::V6(SocketAddrV6::new( - Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), - port, - 0, - 0, - )))) - } - } -} - /// Serializable wrapper for the block locator. #[derive(Debug)] pub struct Locator { diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index eecae9b25..1b50bf59a 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -14,7 +14,7 @@ use crate::util::{Mutex, RwLock}; use std::fs::File; -use std::net::{Shutdown, SocketAddr, TcpStream}; +use std::net::{Shutdown, TcpStream}; use std::sync::Arc; use crate::conn; @@ -25,7 +25,8 @@ use crate::handshake::Handshake; use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest}; use crate::protocol::Protocol; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerInfo, ReasonForBan, TxHashSetRead, + Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, + TxHashSetRead, }; use chrono::prelude::{DateTime, Utc}; @@ -93,7 +94,7 @@ impl Peer { conn: &mut TcpStream, capab: Capabilities, total_difficulty: Difficulty, - self_addr: SocketAddr, + self_addr: PeerAddr, hs: &Handshake, na: Arc, ) -> Result { @@ -124,10 +125,9 @@ impl Peer { self.connection = Some(Mutex::new(conn::listen(conn, handler))); } - pub fn is_denied(config: &P2PConfig, peer_addr: &SocketAddr) -> bool { - let peer = format!("{}:{}", peer_addr.ip(), peer_addr.port()); + pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool { if let Some(ref denied) = config.peers_deny { - if denied.contains(&peer) { + if denied.contains(&peer_addr) { debug!( "checking peer allowed/denied: {:?} explicitly denied", peer_addr @@ -136,7 +136,7 @@ impl Peer { } } if let Some(ref allowed) = config.peers_allow { - if allowed.contains(&peer) { + if allowed.contains(&peer_addr) { debug!( "checking peer allowed/denied: {:?} explicitly allowed", peer_addr @@ -566,7 +566,7 @@ impl ChainAdapter for TrackingAdapter { self.adapter.get_transaction(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) { self.push_recv(kernel_hash); self.adapter.tx_kernel_received(kernel_hash, addr) } @@ -582,23 +582,23 @@ impl ChainAdapter for TrackingAdapter { self.adapter.transaction_received(tx, stem) } - fn block_received(&self, b: core::Block, addr: SocketAddr, _was_requested: bool) -> bool { + fn block_received(&self, b: core::Block, addr: PeerAddr, _was_requested: bool) -> bool { let bh = b.hash(); self.push_recv(bh); self.adapter.block_received(b, addr, self.has_req(bh)) } - fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { + fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool { self.push_recv(cb.hash()); self.adapter.compact_block_received(cb, addr) } - fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { + fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool { self.push_recv(bh.hash()); self.adapter.header_received(bh, addr) } - fn headers_received(&self, bh: &[core::BlockHeader], addr: SocketAddr) -> bool { + fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool { self.adapter.headers_received(bh, addr) } @@ -618,7 +618,7 @@ impl ChainAdapter for TrackingAdapter { self.adapter.txhashset_receive_ready() } - fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool { + fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool { self.adapter.txhashset_write(h, txhashset_data, peer_addr) } @@ -634,19 +634,19 @@ impl ChainAdapter for TrackingAdapter { } impl NetAdapter for TrackingAdapter { - fn find_peer_addrs(&self, capab: Capabilities) -> Vec { + fn find_peer_addrs(&self, capab: Capabilities) -> Vec { self.adapter.find_peer_addrs(capab) } - fn peer_addrs_received(&self, addrs: Vec) { + fn peer_addrs_received(&self, addrs: Vec) { self.adapter.peer_addrs_received(addrs) } - fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { + fn peer_difficulty(&self, addr: PeerAddr, diff: Difficulty, height: u64) { self.adapter.peer_difficulty(addr, diff, height) } - fn is_banned(&self, addr: SocketAddr) -> bool { + fn is_banned(&self, addr: PeerAddr) -> bool { self.adapter.is_banned(addr) } } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index f60abbc4c..19713b7d8 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -15,7 +15,6 @@ use crate::util::RwLock; use std::collections::HashMap; use std::fs::File; -use std::net::SocketAddr; use std::sync::Arc; use rand::{thread_rng, Rng}; @@ -30,14 +29,14 @@ use chrono::Duration; use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ - Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, ReasonForBan, + Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; pub struct Peers { pub adapter: Arc, store: PeerStore, - peers: RwLock>>, + peers: RwLock>>, dandelion_relay: RwLock)>>, config: P2PConfig, } @@ -56,33 +55,25 @@ impl Peers { /// Adds the peer to our internal peer mapping. Note that the peer is still /// returned so the server can run it. pub fn add_connected(&self, peer: Arc) -> Result<(), Error> { - let peer_data: PeerData; - let addr: SocketAddr; - { - peer_data = PeerData { - addr: peer.info.addr, - capabilities: peer.info.capabilities, - user_agent: peer.info.user_agent.clone(), - flags: State::Healthy, - last_banned: 0, - ban_reason: ReasonForBan::None, - last_connected: Utc::now().timestamp(), - }; - addr = peer.info.addr.clone(); - } - debug!("Saving newly connected peer {}.", addr); + let peer_data = PeerData { + addr: peer.info.addr, + capabilities: peer.info.capabilities, + user_agent: peer.info.user_agent.clone(), + flags: State::Healthy, + last_banned: 0, + ban_reason: ReasonForBan::None, + last_connected: Utc::now().timestamp(), + }; + debug!("Saving newly connected peer {}.", peer_data.addr); self.save_peer(&peer_data)?; + self.peers.write().insert(peer_data.addr, peer.clone()); - { - let mut peers = self.peers.write(); - peers.insert(addr, peer.clone()); - } Ok(()) } /// Add a peer as banned to block future connections, usually due to failed /// handshake - pub fn add_banned(&self, addr: SocketAddr, ban_reason: ReasonForBan) -> Result<(), Error> { + pub fn add_banned(&self, addr: PeerAddr, ban_reason: ReasonForBan) -> Result<(), Error> { let peer_data = PeerData { addr, capabilities: Capabilities::UNKNOWN, @@ -129,18 +120,8 @@ impl Peers { self.dandelion_relay.read().clone() } - pub fn is_known(&self, addr: &SocketAddr) -> bool { - self.peers.read().contains_key(addr) - } - - /// Check whether an ip address is in the active peers list, ignore the port - pub fn is_known_ip(&self, addr: &SocketAddr) -> bool { - for socket in self.peers.read().keys() { - if addr.ip() == socket.ip() { - return true; - } - } - return false; + pub fn is_known(&self, addr: PeerAddr) -> bool { + self.peers.read().contains_key(&addr) } /// Get vec of peers we are currently connected to. @@ -166,8 +147,8 @@ impl Peers { } /// Get a peer we're connected to by address. - pub fn get_connected_peer(&self, addr: &SocketAddr) -> Option> { - self.peers.read().get(addr).map(|p| p.clone()) + pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> { + self.peers.read().get(&addr).map(|p| p.clone()) } /// Number of peers currently connected to. @@ -257,31 +238,18 @@ impl Peers { self.most_work_peers().pop() } - pub fn is_banned(&self, peer_addr: SocketAddr) -> bool { - if global::is_production_mode() { - // Ban only cares about ip address, no mather what port. - // so, we query all saved peers with one same ip address, and ignore port - let peers_data = self.store.find_peers_by_ip(peer_addr); - for peer_data in peers_data { - if peer_data.flags == State::Banned { - return true; - } - } - } else { - // For travis-ci test, we need run multiple nodes in one server, with same ip address. - // so, just query the ip address and the port - if let Ok(peer_data) = self.store.get_peer(peer_addr) { - if peer_data.flags == State::Banned { - return true; - } + pub fn is_banned(&self, peer_addr: PeerAddr) -> bool { + if let Ok(peer) = self.store.get_peer(peer_addr) { + if peer.flags == State::Banned { + return true; } } false } /// Ban a peer, disconnecting it if we're currently connected - pub fn ban_peer(&self, peer_addr: &SocketAddr, ban_reason: ReasonForBan) { - if let Err(e) = self.update_state(*peer_addr, State::Banned) { + pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) { + if let Err(e) = self.update_state(peer_addr, State::Banned) { error!("Couldn't ban {}: {:?}", peer_addr, e); } @@ -295,12 +263,12 @@ impl Peers { } /// Unban a peer, checks if it exists and banned then unban - pub fn unban_peer(&self, peer_addr: &SocketAddr) { + pub fn unban_peer(&self, peer_addr: PeerAddr) { debug!("unban_peer: peer {}", peer_addr); - match self.get_peer(*peer_addr) { + match self.get_peer(peer_addr) { Ok(_) => { - if self.is_banned(*peer_addr) { - if let Err(e) = self.update_state(*peer_addr, State::Healthy) { + if self.is_banned(peer_addr) { + if let Err(e) = self.update_state(peer_addr, State::Healthy) { error!("Couldn't unban {}: {:?}", peer_addr, e); } } else { @@ -424,12 +392,12 @@ impl Peers { } /// Get peer in store by address - pub fn get_peer(&self, peer_addr: SocketAddr) -> Result { + pub fn get_peer(&self, peer_addr: PeerAddr) -> Result { self.store.get_peer(peer_addr).map_err(From::from) } /// Whether we've already seen a peer with the provided address - pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { + pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result { self.store.exists_peer(peer_addr).map_err(From::from) } @@ -439,7 +407,7 @@ impl Peers { } /// Updates the state of a peer in store - pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { + pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> { self.store .update_state(peer_addr, new_state) .map_err(From::from) @@ -495,9 +463,9 @@ impl Peers { // now clean up peer map based on the list to remove { let mut peers = self.peers.write(); - for p in rm { - let _ = peers.get(&p).map(|p| p.stop()); - peers.remove(&p); + for addr in rm { + let _ = peers.get(&addr).map(|peer| peer.stop()); + peers.remove(&addr); } } } @@ -558,7 +526,7 @@ impl ChainAdapter for Peers { self.adapter.get_transaction(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) { self.adapter.tx_kernel_received(kernel_hash, addr) } @@ -566,7 +534,7 @@ impl ChainAdapter for Peers { self.adapter.transaction_received(tx, stem) } - fn block_received(&self, b: core::Block, peer_addr: SocketAddr, was_requested: bool) -> bool { + fn block_received(&self, b: core::Block, peer_addr: PeerAddr, was_requested: bool) -> bool { let hash = b.hash(); if !self.adapter.block_received(b, peer_addr, was_requested) { // if the peer sent us a block that's intrinsically bad @@ -575,45 +543,45 @@ impl ChainAdapter for Peers { "Received a bad block {} from {}, the peer will be banned", hash, peer_addr ); - self.ban_peer(&peer_addr, ReasonForBan::BadBlock); + self.ban_peer(peer_addr, ReasonForBan::BadBlock); false } else { true } } - fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: SocketAddr) -> bool { + fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: PeerAddr) -> bool { let hash = cb.hash(); if !self.adapter.compact_block_received(cb, peer_addr) { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( "Received a bad compact block {} from {}, the peer will be banned", - hash, &peer_addr + hash, peer_addr ); - self.ban_peer(&peer_addr, ReasonForBan::BadCompactBlock); + self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock); false } else { true } } - fn header_received(&self, bh: core::BlockHeader, peer_addr: SocketAddr) -> bool { + fn header_received(&self, bh: core::BlockHeader, peer_addr: PeerAddr) -> bool { if !self.adapter.header_received(bh, peer_addr) { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban - self.ban_peer(&peer_addr, ReasonForBan::BadBlockHeader); + self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); false } else { true } } - fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: SocketAddr) -> bool { + fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: PeerAddr) -> bool { if !self.adapter.headers_received(headers, peer_addr) { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban - self.ban_peer(&peer_addr, ReasonForBan::BadBlockHeader); + self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); false } else { true @@ -636,13 +604,13 @@ impl ChainAdapter for Peers { self.adapter.txhashset_receive_ready() } - fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool { + fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool { if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) { debug!( "Received a bad txhashset data from {}, the peer will be banned", &peer_addr ); - self.ban_peer(&peer_addr, ReasonForBan::BadTxHashSet); + self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet); false } else { true @@ -663,14 +631,14 @@ impl ChainAdapter for Peers { impl NetAdapter for Peers { /// Find good peers we know with the provided capability and return their /// addresses. - fn find_peer_addrs(&self, capab: Capabilities) -> Vec { + fn find_peer_addrs(&self, capab: Capabilities) -> Vec { let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize); trace!("find_peer_addrs: {} healthy peers picked", peers.len()); map_vec!(peers, |p| p.addr) } /// A list of peers has been received from one of our peers. - fn peer_addrs_received(&self, peer_addrs: Vec) { + fn peer_addrs_received(&self, peer_addrs: Vec) { trace!("Received {} peer addrs, saving.", peer_addrs.len()); for pa in peer_addrs { if let Ok(e) = self.exists_peer(pa) { @@ -693,13 +661,13 @@ impl NetAdapter for Peers { } } - fn peer_difficulty(&self, addr: SocketAddr, diff: Difficulty, height: u64) { - if let Some(peer) = self.get_connected_peer(&addr) { + fn peer_difficulty(&self, addr: PeerAddr, diff: Difficulty, height: u64) { + if let Some(peer) = self.get_connected_peer(addr) { peer.info.update(height, diff); } } - fn is_banned(&self, addr: SocketAddr) -> bool { + fn is_banned(&self, addr: PeerAddr) -> bool { if let Ok(peer) = self.get_peer(addr) { peer.flags == State::Banned } else { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index b4dd4b5f9..38df07c75 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -16,7 +16,6 @@ use std::cmp; use std::env; use std::fs::File; use std::io::{BufWriter, Write}; -use std::net::SocketAddr; use std::sync::Arc; use crate::conn::{Message, MessageHandler, Response}; @@ -25,18 +24,18 @@ use crate::util::{RateCounter, RwLock}; use chrono::prelude::Utc; use crate::msg::{ - BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, TxHashSetArchive, + BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive, TxHashSetRequest, Type, }; -use crate::types::{Error, NetAdapter}; +use crate::types::{Error, NetAdapter, PeerAddr}; pub struct Protocol { adapter: Arc, - addr: SocketAddr, + addr: PeerAddr, } impl Protocol { - pub fn new(adapter: Arc, addr: SocketAddr) -> Protocol { + pub fn new(adapter: Arc, addr: PeerAddr) -> Protocol { Protocol { adapter, addr } } } @@ -231,19 +230,17 @@ impl MessageHandler for Protocol { Type::GetPeerAddrs => { let get_peers: GetPeerAddrs = msg.body()?; - let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities); + let peers = adapter.find_peer_addrs(get_peers.capabilities); Ok(Some(Response::new( Type::PeerAddrs, - PeerAddrs { - peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), - }, + PeerAddrs { peers }, writer, ))) } Type::PeerAddrs => { let peer_addrs: PeerAddrs = msg.body()?; - adapter.peer_addrs_received(peer_addrs.peers.iter().map(|pa| pa.0).collect()); + adapter.peer_addrs_received(peer_addrs.peers); Ok(None) } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 45fc38354..8be990846 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -29,7 +29,7 @@ use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, ReasonForBan, Seeding, TxHashSetRead, + Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan, TxHashSetRead, }; use crate::util::{Mutex, StopState}; use chrono::prelude::{DateTime, Utc}; @@ -82,6 +82,8 @@ impl Server { match listener.accept() { Ok((stream, peer_addr)) => { + let peer_addr = PeerAddr(peer_addr); + if self.check_undesirable(&stream) { continue; } @@ -107,8 +109,8 @@ impl Server { /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. - pub fn connect(&self, addr: &SocketAddr) -> Result, Error> { - if Peer::is_denied(&self.config, &addr) { + pub fn connect(&self, addr: PeerAddr) -> Result, Error> { + if Peer::is_denied(&self.config, addr) { debug!("connect_peer: peer {} denied, not connecting.", addr); return Err(Error::ConnectionClose); } @@ -134,7 +136,7 @@ impl Server { self.config.port, addr ); - match TcpStream::connect_timeout(addr, Duration::from_secs(10)) { + match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) { Ok(mut stream) => { let addr = SocketAddr::new(self.config.host, self.config.port); let total_diff = self.peers.total_difficulty(); @@ -143,7 +145,7 @@ impl Server { &mut stream, self.capabilities, total_diff, - addr, + PeerAddr(addr), &self.handshake, self.peers.clone(), )?; @@ -191,13 +193,17 @@ impl Server { /// different sets of peers themselves. In addition, it prevent potential /// duplicate connections, malicious or not. fn check_undesirable(&self, stream: &TcpStream) -> bool { - // peer has been banned, go away! if let Ok(peer_addr) = stream.peer_addr() { - let banned = self.peers.is_banned(peer_addr); - let known_ip = - self.peers.is_known_ip(&peer_addr) && self.config.seeding_type == Seeding::DNSSeed; - if banned || known_ip { - debug!("Peer {} banned or known, refusing connection.", peer_addr); + let peer_addr = PeerAddr(peer_addr); + if self.peers.is_banned(peer_addr) { + debug!("Peer {} banned, refusing connection.", peer_addr); + if let Err(e) = stream.shutdown(Shutdown::Both) { + debug!("Error shutting down conn: {:?}", e); + } + return true; + } + if self.peers.is_known(peer_addr) { + debug!("Peer {} already known, refusing connection.", peer_addr); if let Err(e) = stream.shutdown(Shutdown::Both) { debug!("Error shutting down conn: {:?}", e); } @@ -234,18 +240,18 @@ impl ChainAdapter for DummyAdapter { fn get_transaction(&self, _h: Hash) -> Option { None } - fn tx_kernel_received(&self, _h: Hash, _addr: SocketAddr) {} + fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) {} fn transaction_received(&self, _: core::Transaction, _stem: bool) {} - fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { + fn compact_block_received(&self, _cb: core::CompactBlock, _addr: PeerAddr) -> bool { true } - fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { + fn header_received(&self, _bh: core::BlockHeader, _addr: PeerAddr) -> bool { true } - fn block_received(&self, _: core::Block, _: SocketAddr, _: bool) -> bool { + fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> bool { true } - fn headers_received(&self, _: &[core::BlockHeader], _: SocketAddr) -> bool { + fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> bool { true } fn locate_headers(&self, _: &[Hash]) -> Vec { @@ -262,7 +268,7 @@ impl ChainAdapter for DummyAdapter { false } - fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: SocketAddr) -> bool { + fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: PeerAddr) -> bool { false } @@ -277,12 +283,12 @@ impl ChainAdapter for DummyAdapter { } impl NetAdapter for DummyAdapter { - fn find_peer_addrs(&self, _: Capabilities) -> Vec { + fn find_peer_addrs(&self, _: Capabilities) -> Vec { vec![] } - fn peer_addrs_received(&self, _: Vec) {} - fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _: u64) {} - fn is_banned(&self, _: SocketAddr) -> bool { + fn peer_addrs_received(&self, _: Vec) {} + fn peer_difficulty(&self, _: PeerAddr, _: Difficulty, _: u64) {} + fn is_banned(&self, _: PeerAddr) -> bool { false } } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 921070706..074cc066f 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -17,19 +17,17 @@ use chrono::Utc; use num::FromPrimitive; use rand::{thread_rng, Rng}; -use std::net::SocketAddr; use std::sync::Arc; use crate::lmdb; use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; -use crate::msg::SockAddr; -use crate::types::{Capabilities, ReasonForBan}; +use crate::types::{Capabilities, PeerAddr, ReasonForBan}; use grin_store::{self, option_to_not_found, to_key, Error}; const STORE_SUBPATH: &'static str = "peers"; -const PEER_PREFIX: u8 = 'p' as u8; +const PEER_PREFIX: u8 = 'P' as u8; /// Types of messages enum_from_primitive! { @@ -45,7 +43,7 @@ enum_from_primitive! { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerData { /// Network address of the peer. - pub addr: SocketAddr, + pub addr: PeerAddr, /// What capabilities the peer advertises. Unknown until a successful /// connection. pub capabilities: Capabilities, @@ -63,7 +61,7 @@ pub struct PeerData { impl Writeable for PeerData { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - SockAddr(self.addr).write(writer)?; + self.addr.write(writer)?; ser_multiwrite!( writer, [write_u32, self.capabilities.bits()], @@ -79,7 +77,7 @@ impl Writeable for PeerData { impl Readable for PeerData { fn read(reader: &mut dyn Reader) -> Result { - let addr = SockAddr::read(reader)?; + let addr = PeerAddr::read(reader)?; let capab = reader.read_u32()?; let ua = reader.read_bytes_len_prefix()?; let (fl, lb, br) = ser_multiread!(reader, read_u8, read_i64, read_i32); @@ -99,7 +97,7 @@ impl Readable for PeerData { match State::from_u8(fl) { Some(flags) => Ok(PeerData { - addr: addr.0, + addr, capabilities, user_agent, flags: flags, @@ -132,20 +130,20 @@ impl PeerStore { batch.commit() } - pub fn get_peer(&self, peer_addr: SocketAddr) -> Result { + pub fn get_peer(&self, peer_addr: PeerAddr) -> Result { option_to_not_found( self.db.get_ser(&peer_key(peer_addr)[..]), &format!("Peer at address: {}", peer_addr), ) } - pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { + pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result { self.db.exists(&peer_key(peer_addr)[..]) } /// TODO - allow below added to avoid github issue reports #[allow(dead_code)] - pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { + pub fn delete_peer(&self, peer_addr: PeerAddr) -> Result<(), Error> { let batch = self.db.batch()?; batch.delete(&peer_key(peer_addr)[..])?; batch.commit() @@ -162,17 +160,6 @@ impl PeerStore { peers.iter().take(count).cloned().collect() } - /// Query all peers with same IP address, and ignore the port - pub fn find_peers_by_ip(&self, peer_addr: SocketAddr) -> Vec { - self.db - .iter::(&to_key( - PEER_PREFIX, - &mut format!("{}", peer_addr.ip()).into_bytes(), - )) - .unwrap() - .collect::>() - } - /// List all known peers /// Used for /v1/peers/all api endpoint pub fn all_peers(&self) -> Vec { @@ -182,7 +169,7 @@ impl PeerStore { /// Convenience method to load a peer data, update its status and save it /// back. If new state is Banned its last banned time will be updated too. - pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { + pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> { let batch = self.db.batch()?; let mut peer = option_to_not_found( @@ -194,7 +181,7 @@ impl PeerStore { peer.last_banned = Utc::now().timestamp(); } - batch.put_ser(&peer_key(peer.addr)[..], &peer)?; + batch.put_ser(&peer_key(peer_addr)[..], &peer)?; batch.commit() } @@ -226,9 +213,7 @@ impl PeerStore { } } -fn peer_key(peer_addr: SocketAddr) -> Vec { - to_key( - PEER_PREFIX, - &mut format!("{}:{}", peer_addr.ip(), peer_addr.port()).into_bytes(), - ) +// Ignore the port unless ip is loopback address. +fn peer_key(peer_addr: PeerAddr) -> Vec { + to_key(PEER_PREFIX, &mut peer_addr.as_key().into_bytes()) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 60c6eef5c..c6cffefdd 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -16,15 +16,18 @@ use crate::util::RwLock; use std::convert::From; use std::fs::File; use std::io; -use std::net::{IpAddr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; + use std::sync::mpsc; use std::sync::Arc; use chrono::prelude::*; +use crate::core::core; use crate::core::core::hash::Hash; +use crate::core::global; use crate::core::pow::Difficulty; -use crate::core::{core, ser}; +use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; use grin_store; /// Maximum number of block headers a peer should ever send @@ -95,6 +98,106 @@ impl From> for Error { } } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct PeerAddr(pub SocketAddr); + +impl Writeable for PeerAddr { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + match self.0 { + SocketAddr::V4(sav4) => { + ser_multiwrite!( + writer, + [write_u8, 0], + [write_fixed_bytes, &sav4.ip().octets().to_vec()], + [write_u16, sav4.port()] + ); + } + SocketAddr::V6(sav6) => { + writer.write_u8(1)?; + for seg in &sav6.ip().segments() { + writer.write_u16(*seg)?; + } + writer.write_u16(sav6.port())?; + } + } + Ok(()) + } +} + +impl Readable for PeerAddr { + fn read(reader: &mut dyn Reader) -> Result { + let v4_or_v6 = reader.read_u8()?; + if v4_or_v6 == 0 { + let ip = reader.read_fixed_bytes(4)?; + let port = reader.read_u16()?; + Ok(PeerAddr(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), + port, + )))) + } else { + let ip = try_iter_map_vec!(0..8, |_| reader.read_u16()); + let port = reader.read_u16()?; + Ok(PeerAddr(SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), + port, + 0, + 0, + )))) + } + } +} + +impl std::hash::Hash for PeerAddr { + /// If loopback address then we care about ip and port. + /// If regular address then we only care about the ip and ignore the port. + fn hash(&self, state: &mut H) { + if self.0.ip().is_loopback() { + self.0.hash(state); + } else { + self.0.ip().hash(state); + } + } +} + +impl PartialEq for PeerAddr { + /// If loopback address then we care about ip and port. + /// If regular address then we only care about the ip and ignore the port. + fn eq(&self, other: &PeerAddr) -> bool { + if self.0.ip().is_loopback() { + self.0 == other.0 + } else { + self.0.ip() == other.0.ip() + } + } +} + +impl Eq for PeerAddr {} + +impl std::fmt::Display for PeerAddr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl PeerAddr { + /// Convenient way of constructing a new peer_addr from an ip_addr + /// defaults to port 3414 on mainnet and 13414 on floonet. + pub fn from_ip(addr: IpAddr) -> PeerAddr { + let port = if global::is_floonet() { 13414 } else { 3414 }; + PeerAddr(SocketAddr::new(addr, port)) + } + + /// If the ip is loopback then our key is "ip:port" (mainly for local usernet testing). + /// Otherwise we only care about the ip (we disallow multiple peers on the same ip address). + pub fn as_key(&self) -> String { + if self.0.ip().is_loopback() { + format!("{}:{}", self.0.ip(), self.0.port()) + } else { + format!("{}", self.0.ip()) + } + } +} + /// Configuration for the peer-to-peer server. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct P2PConfig { @@ -106,18 +209,18 @@ pub struct P2PConfig { pub seeding_type: Seeding, /// The list of seed nodes, if using Seeding as a seed type - pub seeds: Option>, + pub seeds: Option>, /// Capabilities expose by this node, also conditions which other peers this /// node will have an affinity toward when connection. pub capabilities: Capabilities, - pub peers_allow: Option>, + pub peers_allow: Option>, - pub peers_deny: Option>, + pub peers_deny: Option>, /// The list of preferred peers that we will try to connect to - pub peers_preferred: Option>, + pub peers_preferred: Option>, pub ban_window: Option, @@ -125,7 +228,7 @@ pub struct P2PConfig { pub peer_min_preferred_count: Option, - pub dandelion_peer: Option, + pub dandelion_peer: Option, } /// Default address for peer-to-peer connections. @@ -178,7 +281,7 @@ impl P2PConfig { } /// Type of seeding the server will use to find other peers on the network. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] pub enum Seeding { /// No seeding, mostly for tests that programmatically connect None, @@ -262,7 +365,7 @@ pub struct PeerInfo { pub capabilities: Capabilities, pub user_agent: String, pub version: u32, - pub addr: SocketAddr, + pub addr: PeerAddr, pub direction: Direction, pub live_info: Arc>, } @@ -307,7 +410,7 @@ pub struct PeerInfoDisplay { pub capabilities: Capabilities, pub user_agent: String, pub version: u32, - pub addr: SocketAddr, + pub addr: PeerAddr, pub direction: Direction, pub total_difficulty: Difficulty, pub height: u64, @@ -353,22 +456,22 @@ pub trait ChainAdapter: Sync + Send { fn get_transaction(&self, kernel_hash: Hash) -> Option; - fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr); + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr); /// A block has been received from one of our peers. Returns true if the /// block could be handled properly and is not deemed defective by the /// chain. Returning false means the block will never be valid and /// may result in the peer being banned. - fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool; + fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool; - fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool; + fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool; - fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool; + fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool; /// A set of block header has been received, typically in response to a /// block /// header request. - fn headers_received(&self, bh: &[core::BlockHeader], addr: SocketAddr) -> bool; + fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool; /// Finds a list of block headers based on the provided locator. Tries to /// identify the common chain and gets the headers that follow it @@ -401,7 +504,7 @@ pub trait ChainAdapter: Sync + Send { /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be /// rewound to the provided indexes. - fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool; + fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool; } /// Additional methods required by the protocol that don't need to be @@ -409,14 +512,14 @@ pub trait ChainAdapter: Sync + Send { pub trait NetAdapter: ChainAdapter { /// Find good peers we know with the provided capability and return their /// addresses. - fn find_peer_addrs(&self, capab: Capabilities) -> Vec; + fn find_peer_addrs(&self, capab: Capabilities) -> Vec; /// A list of peers has been received from one of our peers. - fn peer_addrs_received(&self, _: Vec); + fn peer_addrs_received(&self, _: Vec); /// Heard total_difficulty from a connected peer (via ping/pong). - fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _: u64); + fn peer_difficulty(&self, _: PeerAddr, _: Difficulty, _: u64); /// Is this peer currently banned? - fn is_banned(&self, addr: SocketAddr) -> bool; + fn is_banned(&self, addr: PeerAddr) -> bool; } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index ae229f473..57c335e4d 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -25,6 +25,7 @@ use std::{thread, time}; use crate::core::core::hash::Hash; use crate::core::pow::Difficulty; +use crate::p2p::types::PeerAddr; use crate::p2p::Peer; fn open_port() -> u16 { @@ -70,7 +71,7 @@ fn peer_handshake() { let addr = SocketAddr::new(p2p_config.host, p2p_config.port); let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap(); - let my_addr = "127.0.0.1:5000".parse().unwrap(); + let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap()); let mut peer = Peer::connect( &mut socket, p2p::Capabilities::UNKNOWN, @@ -89,7 +90,7 @@ fn peer_handshake() { peer.send_ping(Difficulty::min(), 0).unwrap(); thread::sleep(time::Duration::from_secs(1)); - let server_peer = server.peers.get_connected_peer(&my_addr).unwrap(); + let server_peer = server.peers.get_connected_peer(my_addr).unwrap(); assert_eq!(server_peer.info.total_difficulty(), Difficulty::min()); assert!(server.peers.peer_count() > 0); } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 279919cab..ae160267b 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -17,7 +17,6 @@ use crate::util::RwLock; use std::fs::File; -use std::net::SocketAddr; use std::sync::{Arc, Weak}; use std::thread; use std::time::Instant; @@ -31,6 +30,7 @@ use crate::core::core::{BlockHeader, BlockSums, CompactBlock}; use crate::core::pow::Difficulty; use crate::core::{core, global}; use crate::p2p; +use crate::p2p::types::PeerAddr; use crate::pool; use crate::util::OneTime; use chrono::prelude::*; @@ -62,7 +62,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: SocketAddr) { + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) { // nothing much we can do with a new transaction while syncing if self.sync_state.is_syncing() { return; @@ -71,7 +71,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash); if tx.is_none() { - self.request_transaction(kernel_hash, &addr); + self.request_transaction(kernel_hash, addr); } } @@ -107,7 +107,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } - fn block_received(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool { + fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool { debug!( "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", b.hash(), @@ -120,7 +120,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { self.process_block(b, addr, was_requested) } - fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { + fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool { let bhash = cb.hash(); debug!( "Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.", @@ -187,7 +187,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } else { if self.sync_state.status() == SyncStatus::NoSync { debug!("adapter: block invalid after hydration, requesting full block"); - self.request_block(&cb.header, &addr); + self.request_block(&cb.header, addr); true } else { debug!("block invalid after hydration, ignoring it, cause still syncing"); @@ -201,7 +201,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { } } - fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { + fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool { let bhash = bh.hash(); debug!( "Received block header {} at {} from {}, going to process.", @@ -227,13 +227,13 @@ impl p2p::ChainAdapter for NetToChainAdapter { // we have successfully processed a block header // so we can go request the block itself - self.request_compact_block(&bh, &addr); + self.request_compact_block(&bh, addr); // done receiving the header true } - fn headers_received(&self, bhs: &[core::BlockHeader], addr: SocketAddr) -> bool { + fn headers_received(&self, bhs: &[core::BlockHeader], addr: PeerAddr) -> bool { info!("Received {} block headers from {}", bhs.len(), addr,); if bhs.len() == 0 { @@ -342,7 +342,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be /// rewound to the provided indexes. - fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: SocketAddr) -> bool { + fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: PeerAddr) -> bool { // check status again after download, in case 2 txhashsets made it somehow if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { } else { @@ -421,7 +421,7 @@ impl NetToChainAdapter { // pushing the new block through the chain pipeline // remembering to reset the head if we have a bad block - fn process_block(&self, b: core::Block, addr: SocketAddr, was_requested: bool) -> bool { + fn process_block(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool { // We cannot process blocks earlier than the horizon so check for this here. { let head = self.chain().head().unwrap(); @@ -458,7 +458,7 @@ impl NetToChainAdapter { && !self.sync_state.is_syncing() { debug!("process_block: received an orphan block, checking the parent: {:}", previous.hash()); - self.request_block_by_hash(previous.hash(), &addr) + self.request_block_by_hash(previous.hash(), addr) } } true @@ -525,7 +525,7 @@ impl NetToChainAdapter { } } - fn request_transaction(&self, h: Hash, addr: &SocketAddr) { + fn request_transaction(&self, h: Hash, addr: PeerAddr) { self.send_tx_request_to_peer(h, addr, |peer, h| peer.send_tx_request(h)) } @@ -533,24 +533,24 @@ impl NetToChainAdapter { // it into a full block then fallback to requesting the full block // from the same peer that gave us the compact block // consider additional peers for redundancy? - fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) { + fn request_block(&self, bh: &BlockHeader, addr: PeerAddr) { self.request_block_by_hash(bh.hash(), addr) } - fn request_block_by_hash(&self, h: Hash, addr: &SocketAddr) { + fn request_block_by_hash(&self, h: Hash, addr: PeerAddr) { self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h)) } // After we have received a block header in "header first" propagation // we need to go request the block (compact representation) from the // same peer that gave us the header (unless we have already accepted the block) - fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) { + fn request_compact_block(&self, bh: &BlockHeader, addr: PeerAddr) { self.send_block_request_to_peer(bh.hash(), addr, |peer, h| { peer.send_compact_block_request(h) }) } - fn send_tx_request_to_peer(&self, h: Hash, addr: &SocketAddr, f: F) + fn send_tx_request_to_peer(&self, h: Hash, addr: PeerAddr, f: F) where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { @@ -567,7 +567,7 @@ impl NetToChainAdapter { } } - fn send_block_request_to_peer(&self, h: Hash, addr: &SocketAddr, f: F) + fn send_block_request_to_peer(&self, h: Hash, addr: PeerAddr, f: F) where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index fa01cc5a1..dd6f2d134 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -21,12 +21,13 @@ use chrono::prelude::{DateTime, Utc}; use chrono::{Duration, MIN_DATE}; use rand::{thread_rng, Rng}; use std::collections::HashMap; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::net::ToSocketAddrs; use std::sync::{mpsc, Arc}; use std::{cmp, str, thread, time}; use crate::core::global; use crate::p2p; +use crate::p2p::types::PeerAddr; use crate::p2p::ChainAdapter; use crate::pool::DandelionConfig; use crate::util::{Mutex, StopState}; @@ -52,8 +53,8 @@ pub fn connect_and_monitor( p2p_server: Arc, capabilities: p2p::Capabilities, dandelion_config: DandelionConfig, - seed_list: Box Vec + Send>, - preferred_peers: Option>, + seed_list: Box Vec + Send>, + preferred_peers: Option>, stop_state: Arc>, ) { let _ = thread::Builder::new() @@ -78,7 +79,7 @@ pub fn connect_and_monitor( let mut prev_ping = Utc::now(); let mut start_attempt = 0; - let mut connecting_history: HashMap> = HashMap::new(); + let mut connecting_history: HashMap> = HashMap::new(); loop { if stop_state.lock().is_stopped() { @@ -140,8 +141,8 @@ pub fn connect_and_monitor( fn monitor_peers( peers: Arc, config: p2p::P2PConfig, - tx: mpsc::Sender, - preferred_peers_list: Option>, + tx: mpsc::Sender, + preferred_peers_list: Option>, ) { // regularly check if we need to acquire more peers and if so, gets // them from db @@ -156,7 +157,7 @@ fn monitor_peers( let interval = Utc::now().timestamp() - x.last_banned; // Unban peer if interval >= config.ban_window() { - peers.unban_peer(&x.addr); + peers.unban_peer(x.addr); debug!( "monitor_peers: unbanned {} after {} seconds", x.addr, interval @@ -192,7 +193,7 @@ fn monitor_peers( // loop over connected peers // ask them for their list of peers - let mut connected_peers: Vec = vec![]; + let mut connected_peers: Vec = vec![]; for p in peers.connected_peers() { trace!( "monitor_peers: {}:{} ask {} for more peers", @@ -205,19 +206,16 @@ fn monitor_peers( } // Attempt to connect to preferred peers if there is some - match preferred_peers_list { - Some(preferred_peers) => { - for p in preferred_peers { - if !connected_peers.is_empty() { - if !connected_peers.contains(&p) { - tx.send(p).unwrap(); - } - } else { + if let Some(preferred_peers) = preferred_peers_list { + for p in preferred_peers { + if !connected_peers.is_empty() { + if !connected_peers.contains(&p) { tx.send(p).unwrap(); } + } else { + tx.send(p).unwrap(); } } - None => debug!("monitor_peers: no preferred peers"), } // take a random defunct peer and mark it healthy: over a long period any @@ -235,7 +233,7 @@ fn monitor_peers( config.peer_max_count() as usize, ); - for p in new_peers.iter().filter(|p| !peers.is_known(&p.addr)) { + for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) { trace!( "monitor_peers: on {}:{}, queue to soon try {}", config.host, @@ -265,9 +263,9 @@ fn update_dandelion_relay(peers: Arc, dandelion_config: DandelionCon // otherwise use the seeds provided. fn connect_to_seeds_and_preferred_peers( peers: Arc, - tx: mpsc::Sender, - seed_list: Box Vec>, - peers_preferred_list: Option>, + tx: mpsc::Sender, + seed_list: Box Vec>, + peers_preferred_list: Option>, ) { // check if we have some peers in db // look for peers that are able to give us other peers (via PEER_LIST capability) @@ -303,14 +301,14 @@ fn listen_for_addrs( peers: Arc, p2p: Arc, capab: p2p::Capabilities, - rx: &mpsc::Receiver, - connecting_history: &mut HashMap>, + rx: &mpsc::Receiver, + connecting_history: &mut HashMap>, ) { // Pull everything currently on the queue off the queue. // Does not block so addrs may be empty. // We will take(max_peers) from this later but we want to drain the rx queue // here to prevent it backing up. - let addrs: Vec = rx.try_iter().collect(); + let addrs: Vec = rx.try_iter().collect(); // If we have a healthy number of outbound peers then we are done here. if peers.healthy_peers_mix() { @@ -342,7 +340,7 @@ fn listen_for_addrs( let p2p_c = p2p.clone(); let _ = thread::Builder::new() .name("peer_connect".to_string()) - .spawn(move || match p2p_c.connect(&addr) { + .spawn(move || match p2p_c.connect(addr) { Ok(p) => { let _ = p.send_peer_request(capab); let _ = peers_c.update_state(addr, p2p::State::Healthy); @@ -368,9 +366,9 @@ fn listen_for_addrs( } } -pub fn dns_seeds() -> Box Vec + Send> { +pub fn dns_seeds() -> Box Vec + Send> { Box::new(|| { - let mut addresses: Vec = vec![]; + let mut addresses: Vec = vec![]; let net_seeds = if global::is_floonet() { FLOONET_DNS_SEEDS } else { @@ -384,7 +382,7 @@ pub fn dns_seeds() -> Box Vec + Send> { &mut (addrs .map(|mut addr| { addr.set_port(if global::is_floonet() { 13414 } else { 3414 }); - addr + PeerAddr(addr) }) .filter(|addr| !temp_addresses.contains(addr)) .collect()), @@ -399,26 +397,6 @@ pub fn dns_seeds() -> Box Vec + Send> { /// Convenience function when the seed list is immediately known. Mostly used /// for tests. -pub fn predefined_seeds(addrs_str: Vec) -> Box Vec + Send> { - Box::new(move || { - addrs_str - .iter() - .map(|s| s.parse().unwrap()) - .collect::>() - }) -} - -/// Convenience function when the seed list is immediately known. Mostly used -/// for tests. -pub fn preferred_peers(addrs_str: Vec) -> Option> { - if addrs_str.is_empty() { - None - } else { - Some( - addrs_str - .iter() - .map(|s| s.parse().unwrap()) - .collect::>(), - ) - } +pub fn predefined_seeds(addrs: Vec) -> Box Vec + Send> { + Box::new(move || addrs.clone()) } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index ea2532a77..ac25eb3f7 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -16,7 +16,6 @@ //! the peer-to-peer server, the blockchain and the transaction pool) and acts //! as a facade. -use std::net::SocketAddr; use std::sync::Arc; use std::{thread, time}; @@ -35,6 +34,7 @@ use crate::grin::{dandelion_monitor, seed, sync}; use crate::mining::stratumserver; use crate::mining::test_miner::Miner; use crate::p2p; +use crate::p2p::types::PeerAddr; use crate::pool; use crate::store; use crate::util::file::get_first_line; @@ -103,7 +103,7 @@ impl Server { } /// Instantiates a new server associated with the provided future reactor. - pub fn new(mut config: ServerConfig) -> Result { + pub fn new(config: ServerConfig) -> Result { // Defaults to None (optional) in config file. // This translates to false here. let archive_mode = match config.archive_mode { @@ -178,30 +178,25 @@ impl Server { pool_net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone()); - if config.p2p_config.seeding_type.clone() != p2p::Seeding::Programmatic { - let seeder = match config.p2p_config.seeding_type.clone() { + if config.p2p_config.seeding_type != p2p::Seeding::Programmatic { + let seeder = match config.p2p_config.seeding_type { p2p::Seeding::None => { warn!("No seed configured, will stay solo until connected to"); seed::predefined_seeds(vec![]) } p2p::Seeding::List => { - seed::predefined_seeds(config.p2p_config.seeds.as_mut().unwrap().clone()) + seed::predefined_seeds(config.p2p_config.seeds.clone().unwrap()) } p2p::Seeding::DNSSeed => seed::dns_seeds(), _ => unreachable!(), }; - let peers_preferred = match config.p2p_config.peers_preferred.clone() { - Some(peers_preferred) => seed::preferred_peers(peers_preferred), - None => None, - }; - seed::connect_and_monitor( p2p_server.clone(), config.p2p_config.capabilities, config.dandelion_config.clone(), seeder, - peers_preferred, + config.p2p_config.peers_preferred.clone(), stop_state.clone(), ); } @@ -273,8 +268,8 @@ impl Server { } /// Asks the server to connect to a peer at the provided network address. - pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> { - self.p2p.connect(&addr)?; + pub fn connect_peer(&self, addr: PeerAddr) -> Result<(), Error> { + self.p2p.connect(addr)?; Ok(()) } diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index 1c21aeee6..81c049315 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -148,7 +148,7 @@ impl HeaderSync { && highest_height == peer.info.height() { self.peers - .ban_peer(&peer.info.addr, ReasonForBan::FraudHeight); + .ban_peer(peer.info.addr, ReasonForBan::FraudHeight); info!( "sync: ban a fraud peer: {}, claimed height: {}, total difficulty: {}", peer.info.addr, diff --git a/servers/tests/framework.rs b/servers/tests/framework.rs index d0bc36d71..5b9f8b28d 100644 --- a/servers/tests/framework.rs +++ b/servers/tests/framework.rs @@ -13,6 +13,7 @@ // limitations under the License. use self::keychain::Keychain; +use self::p2p::PeerAddr; use self::util::Mutex; use self::wallet::{HTTPNodeClient, HTTPWalletCommAdapter, LMDBBackend, WalletConfig}; use blake2_rfc as blake2; @@ -191,7 +192,7 @@ impl LocalServerContainer { if self.config.seed_addr.len() > 0 { seeding_type = p2p::Seeding::List; - seeds = vec![self.config.seed_addr.to_string()]; + seeds = vec![PeerAddr(self.config.seed_addr.parse().unwrap())]; } let s = servers::Server::new(servers::ServerConfig { @@ -233,9 +234,9 @@ impl LocalServerContainer { s.start_test_miner(wallet_url, s.stop_state.clone()); } - for p in &mut self.peer_list { + for p in &self.peer_list { println!("{} connecting to peer: {}", self.config.p2p_server_port, p); - let _ = s.connect_peer(p.parse().unwrap()); + let _ = s.connect_peer(PeerAddr(p.parse().unwrap())); } if self.wallet_is_running { @@ -647,7 +648,9 @@ pub fn config(n: u16, test_name_dir: &str, seed_n: u16) -> servers::ServerConfig p2p_config: p2p::P2PConfig { port: 10000 + n, seeding_type: p2p::Seeding::List, - seeds: Some(vec![format!("127.0.0.1:{}", 10000 + seed_n)]), + seeds: Some(vec![PeerAddr( + format!("127.0.0.1:{}", 10000 + seed_n).parse().unwrap(), + )]), ..p2p::P2PConfig::default() }, chain_type: core::global::ChainTypes::AutomatedTesting, diff --git a/servers/tests/simulnet.rs b/servers/tests/simulnet.rs index 424ab0f02..201b03647 100644 --- a/servers/tests/simulnet.rs +++ b/servers/tests/simulnet.rs @@ -19,6 +19,7 @@ mod framework; use self::core::core::hash::Hashed; use self::core::global::{self, ChainTypes}; +use self::p2p::PeerAddr; use self::util::{Mutex, StopState}; use self::wallet::controller; use self::wallet::libwallet::types::{WalletBackend, WalletInst}; @@ -933,7 +934,9 @@ fn replicate_tx_fluff_failure() { // Server 2 (another node) let mut s2_config = framework::config(3001, "tx_fluff", 3001); - s2_config.p2p_config.seeds = Some(vec!["127.0.0.1:13000".to_owned()]); + s2_config.p2p_config.seeds = Some(vec![PeerAddr( + "127.0.0.1:13000".to_owned().parse().unwrap(), + )]); s2_config.dandelion_config.embargo_secs = Some(10); s2_config.dandelion_config.patience_secs = Some(1); s2_config.dandelion_config.relay_secs = Some(1); @@ -944,7 +947,9 @@ fn replicate_tx_fluff_failure() { for i in 0..dl_nodes { // (create some stem nodes) let mut s_config = framework::config(3002 + i, "tx_fluff", 3002 + i); - s_config.p2p_config.seeds = Some(vec!["127.0.0.1:13000".to_owned()]); + s_config.p2p_config.seeds = Some(vec![PeerAddr( + "127.0.0.1:13000".to_owned().parse().unwrap(), + )]); s_config.dandelion_config.embargo_secs = Some(10); s_config.dandelion_config.patience_secs = Some(1); s_config.dandelion_config.relay_secs = Some(1); diff --git a/src/bin/cmd/server.rs b/src/bin/cmd/server.rs index 1d3b03c1a..0a21c5166 100644 --- a/src/bin/cmd/server.rs +++ b/src/bin/cmd/server.rs @@ -24,7 +24,7 @@ use ctrlc; use crate::config::GlobalConfig; use crate::core::global; -use crate::p2p::Seeding; +use crate::p2p::{PeerAddr, Seeding}; use crate::servers; use crate::tui::ui; @@ -116,45 +116,15 @@ pub fn server_command( } if let Some(seeds) = a.values_of("seed") { + let seed_addrs = seeds + .filter_map(|x| x.parse().ok()) + .map(|x| PeerAddr(x)) + .collect(); server_config.p2p_config.seeding_type = Seeding::List; - server_config.p2p_config.seeds = Some(seeds.map(|s| s.to_string()).collect()); + server_config.p2p_config.seeds = Some(seed_addrs); } } - /*if let Some(true) = server_config.run_wallet_listener { - let mut wallet_config = global_config.members.as_ref().unwrap().wallet.clone(); - wallet::init_wallet_seed(wallet_config.clone()); - let wallet = wallet::instantiate_wallet(wallet_config.clone(), ""); - - let _ = thread::Builder::new() - .name("wallet_listener".to_string()) - .spawn(move || { - controller::foreign_listener(wallet, &wallet_config.api_listen_addr()) - .unwrap_or_else(|e| { - panic!( - "Error creating wallet listener: {:?} Config: {:?}", - e, wallet_config - ) - }); - }); - } - if let Some(true) = server_config.run_wallet_owner_api { - let mut wallet_config = global_config.members.unwrap().wallet; - let wallet = wallet::instantiate_wallet(wallet_config.clone(), ""); - wallet::init_wallet_seed(wallet_config.clone()); - - let _ = thread::Builder::new() - .name("wallet_owner_listener".to_string()) - .spawn(move || { - controller::owner_listener(wallet, "127.0.0.1:13420").unwrap_or_else(|e| { - panic!( - "Error creating wallet api listener: {:?} Config: {:?}", - e, wallet_config - ) - }); - }); - }*/ - if let Some(a) = server_args { match a.subcommand() { ("run", _) => {