diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 062299404..1a2b7d100 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -89,6 +89,12 @@ impl NetAdapter for NetToChainAdapter { } fn headers_received(&self, bhs: Vec) { + info!( + LOGGER, + "Received {} block headers", + bhs.len(), + ); + // try to add each header to our header chain let mut added_hs = vec![]; for bh in bhs { @@ -134,11 +140,19 @@ impl NetAdapter for NetToChainAdapter { } fn locate_headers(&self, locator: Vec) -> Vec { + debug!( + LOGGER, + "locate_headers: {:?}", + locator, + ); + if locator.len() == 0 { return vec![]; } - // go through the locator vector and check if we know any of these headers + // recursively go back through the locator vector + // and stop when we find a header that we recognize + // this will be a header shared in common between us and the peer let known = self.chain.get_block_header(&locator[0]); let header = match known { Ok(header) => header, @@ -151,6 +165,12 @@ impl NetAdapter for NetToChainAdapter { } }; + debug!( + LOGGER, + "locate_headers: {:?}", + header, + ); + // looks like we know one, getting as many following headers as allowed let hh = header.height; let mut headers = vec![]; @@ -165,6 +185,13 @@ impl NetAdapter for NetToChainAdapter { } } } + + debug!( + LOGGER, + "locate_headers: returning headers: {}", + headers.len(), + ); + headers } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 44a9844e5..6be5f4dc7 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -63,7 +63,7 @@ impl Seeder { seed_list: Box, Error = String>>, ) { // open a channel with a listener that connects every peer address sent below - // max peer count + // max peer count let (tx, rx) = futures::sync::mpsc::unbounded(); h.spawn(self.listen_for_addrs(h.clone(), rx)); @@ -85,12 +85,14 @@ impl Seeder { let p2p_server = self.p2p.clone(); // now spawn a new future to regularly check if we need to acquire more peers - // and if so, gets them from db + // and if so, gets them from db let mon_loop = Timer::default() .interval(time::Duration::from_secs(10)) .for_each(move |_| { + debug!(LOGGER, "monitoring peers ({})", p2p_server.all_peers().len()); + // maintenance step first, clean up p2p server peers and mark bans - // if needed + // if needed let disconnected = p2p_server.clean_peers(); for p in disconnected { if p.is_banned() { diff --git a/grin/src/server.rs b/grin/src/server.rs index d73b86102..723fcb313 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -122,7 +122,11 @@ impl Server { Seeding::None => { warn!( LOGGER, - "No seed configured, will stay solo until connected to" + "No seed(s) configured, will stay solo until connected to" + ); + seed.connect_and_monitor( + evt_handle.clone(), + seed::predefined_seeds(vec![]), ); } Seeding::List => { @@ -132,12 +136,16 @@ impl Server { ); } Seeding::WebStatic => { - seed.connect_and_monitor(evt_handle.clone(), seed::web_seeds(evt_handle.clone())); + seed.connect_and_monitor( + evt_handle.clone(), + seed::web_seeds(evt_handle.clone()), + ); } _ => {} } - if config.seeding_type != Seeding::None { + // If we have any known seeds or peers then attempt to sync. + if config.seeding_type != Seeding::None || peer_store.all_peers().len() > 0 { let sync = sync::Syncer::new(shared_chain.clone(), p2p_server.clone()); net_adapter.start_sync(sync); } diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 5b78e5116..58e725254 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -70,27 +70,41 @@ impl Syncer { /// Checks the local chain state, comparing it with our peers and triggers /// syncing if required. pub fn run(&self) -> Result<(), Error> { - debug!(LOGGER, "Starting syncer."); + info!(LOGGER, "Sync: starting sync"); + + // Loop for 10s waiting for some peers to potentially sync from. let start = Instant::now(); loop { let pc = self.p2p.peer_count(); if pc > 3 { break; } - if pc > 0 && (Instant::now() - start > Duration::from_secs(10)) { + if Instant::now() - start > Duration::from_secs(10) { break; } thread::sleep(Duration::from_millis(200)); } + // Now check we actually have at least one peer to sync from. + // If not then end the sync cleanly. + if self.p2p.peer_count() == 0 { + info!(LOGGER, "Sync: no peers to sync with, done."); + + let mut sync = self.sync.lock().unwrap(); + *sync = false; + + return Ok(()) + } + // check if we have missing full blocks for which we already have a header self.init_download()?; // main syncing loop, requests more headers and bodies periodically as long - // as a peer with higher difficulty exists and we're not fully caught up + // as a peer with higher difficulty exists and we're not fully caught up info!(LOGGER, "Starting sync loop."); loop { let tip = self.chain.get_header_head()?; + // TODO do something better (like trying to get more) if we lose peers let peer = self.p2p.most_work_peer().unwrap(); debug!( @@ -104,6 +118,12 @@ impl Syncer { let more_bodies = { let blocks_to_download = self.blocks_to_download.lock().unwrap(); let blocks_downloading = self.blocks_downloading.lock().unwrap(); + debug!( + LOGGER, + "Sync: blocks to download {}, block downloading {}", + blocks_to_download.len(), + blocks_downloading.len(), + ); blocks_to_download.len() > 0 || blocks_downloading.len() > 0 }; @@ -125,7 +145,7 @@ impl Syncer { thread::sleep(Duration::from_secs(2)); } - info!(LOGGER, "Sync done."); + info!(LOGGER, "Sync: done."); Ok(()) } diff --git a/grin/tests/framework/mod.rs b/grin/tests/framework/mod.rs index 9447f71a5..fad2b042d 100644 --- a/grin/tests/framework/mod.rs +++ b/grin/tests/framework/mod.rs @@ -44,7 +44,7 @@ use wallet::WalletConfig; /// Just removes all results from previous runs pub fn clean_all_output(test_name_dir: &str) { - let target_dir = format!("target/test_servers/{}", test_name_dir); + let target_dir = format!("target/{}", test_name_dir); let result = fs::remove_dir_all(target_dir); if let Err(e) = result { println!("{}", e); diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index d328e3b39..80de80acf 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -191,7 +191,6 @@ fn a_simulate_block_propagation() { let test_name_dir = "grin-prop"; framework::clean_all_output(test_name_dir); - let mut evtlp = reactor::Core::new().unwrap(); let handle = evtlp.handle(); @@ -221,6 +220,8 @@ fn a_simulate_block_propagation() { port: 18000 + n, ..p2p::P2PConfig::default() }), + seeding_type: grin::Seeding::List, + seeds: Some(vec!["127.0.0.1:18000".to_string()]), ..Default::default() }, &handle, @@ -228,23 +229,12 @@ fn a_simulate_block_propagation() { servers.push(s); } - // everyone connects to everyone else - for n in 0..5 { - for m in 0..5 { - if m == n { - continue; - } - let addr = format!("{}:{}", "127.0.0.1", 18000 + m); - servers[n].connect_peer(addr.parse().unwrap()).unwrap(); - } - } - // start mining servers[0].start_miner(miner_config); let original_height = servers[0].head().height; // monitor for a change of head on a different server and check whether - // chain height has changed + // chain height has changed evtlp.run(change(&servers[4]).and_then(|tip| { assert!(tip.height == original_height + 1); Ok(()) @@ -283,16 +273,16 @@ fn simulate_full_sync() { let mut servers = vec![]; for n in 0..2 { let mut config = grin::ServerConfig { + api_http_addr: format!("127.0.0.1:{}", 19000 + n), db_root: format!("target/{}/grin-sync-{}", test_name_dir, n), p2p_config: Some(p2p::P2PConfig { port: 11000 + n, ..p2p::P2PConfig::default() }), + seeding_type: grin::Seeding::List, + seeds: Some(vec!["127.0.0.1:11000".to_string()]), ..Default::default() }; - if n == 1 { - config.seeding_type = grin::Seeding::Programmatic; - } let s = grin::Server::future(config, &handle).unwrap(); servers.push(s); } @@ -301,10 +291,6 @@ fn simulate_full_sync() { servers[0].start_miner(miner_config); thread::sleep(time::Duration::from_secs(5)); - // connect 1 and 2 - let addr = format!("{}:{}", "127.0.0.1", 11001); - servers[0].connect_peer(addr.parse().unwrap()).unwrap(); - // 2 should get blocks evtlp.run(change(&servers[1])); } diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 836f0e9d4..6fcab2402 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -57,8 +57,16 @@ impl Handshake { self_addr: SocketAddr, conn: TcpStream, ) -> Box> { - // prepare the first part of the hanshake + // prepare the first part of the handshake let nonce = self.next_nonce(); + debug!( + LOGGER, + "handshake connect with nonce - {}, sender - {:?}, receiver - {:?}", + nonce, + self_addr, + conn.peer_addr().unwrap(), + ); + let hand = Hand { version: PROTOCOL_VERSION, capabilities: capab, @@ -117,7 +125,14 @@ impl Handshake { { // check the nonce to see if we could be trying to connect to ourselves let nonces = nonces.read().unwrap(); + debug!( + LOGGER, + "checking the nonce - {}, {:?}", + &hand.nonce, + nonces, + ); if nonces.contains(&hand.nonce) { + debug!(LOGGER, "***** nonce matches! Avoiding connecting to ourselves"); return Err(Error::Serialization(ser::Error::UnexpectedData { expected: vec![], received: vec![], @@ -173,6 +188,8 @@ fn extract_ip(advertised: &SocketAddr, conn: &TcpStream) -> SocketAddr { match advertised { &SocketAddr::V4(v4sock) => { let ip = v4sock.ip(); + debug!(LOGGER, "extract_ip - {:?}, {:?}", ip, conn.peer_addr()); + if ip.is_loopback() || ip.is_unspecified() { if let Ok(addr) = conn.peer_addr() { return SocketAddr::new(addr.ip(), advertised.port()); diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 9b4815d37..5ee542d9d 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -62,7 +62,7 @@ impl Peer { capab: Capabilities, total_difficulty: Difficulty, self_addr: SocketAddr, - hs: &Handshake, + hs: Arc, na: Arc, ) -> Box> { let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn) diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 2023c9b31..507a25419 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -16,6 +16,7 @@ //! other peers in the network. use std::cell::RefCell; +use std::collections::HashMap; use std::net::SocketAddr; use std::ops::Deref; use std::sync::{Arc, RwLock}; @@ -63,7 +64,8 @@ impl NetAdapter for DummyAdapter { pub struct Server { config: P2PConfig, capabilities: Capabilities, - peers: Arc>>>, + peers: Arc>>>, + handshake: Arc, adapter: Arc, stop: RefCell>>, } @@ -78,7 +80,8 @@ impl Server { Server { config: config, capabilities: capab, - peers: Arc::new(RwLock::new(Vec::new())), + peers: Arc::new(RwLock::new(HashMap::new())), + handshake: Arc::new(Handshake::new()), adapter: adapter, stop: RefCell::new(None), } @@ -91,7 +94,7 @@ impl Server { let socket = TcpListener::bind(&addr, &h.clone()).unwrap(); warn!(LOGGER, "P2P server started on {}", addr); - let hs = Arc::new(Handshake::new()); + let handshake = self.handshake.clone(); let peers = self.peers.clone(); let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); @@ -104,7 +107,7 @@ impl Server { let peers = peers.clone(); // accept the peer and add it to the server map - let accept = Peer::accept(conn, capab, total_diff, &hs.clone(), adapter.clone()); + let accept = Peer::accept(conn, capab, total_diff, &handshake.clone(), adapter.clone()); let added = add_to_peers(peers, adapter, accept); // wire in a future to timeout the accept after 5 secs @@ -153,13 +156,10 @@ impl Server { // if we're already connected to the addr, just return the peer return Box::new(future::ok(Some(p))); } - if self.is_self(addr) { - // asked to connect to ourselves - return Box::new(future::ok(None)); - } // cloneapalooza let peers = self.peers.clone(); + let handshake = self.handshake.clone(); let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); let self_addr = SocketAddr::new(self.config.host, self.config.port); @@ -174,13 +174,13 @@ impl Server { let total_diff = adapter.clone().total_difficulty(); // connect to the peer and add it to the server map, wiring it a timeout for - // the handhake + // the handshake let connect = Peer::connect( socket, capab, total_diff, self_addr, - &Handshake::new(), + handshake.clone(), adapter.clone(), ); let added = add_to_peers(peers, adapter, connect); @@ -196,53 +196,47 @@ impl Server { Box::new(request) } - /// Check if the server already knows this peer (is already connected). In - /// addition we consider to know ourselves. + /// Check if the server already knows this peer (is already connected). pub fn is_known(&self, addr: SocketAddr) -> bool { - self.get_peer(addr).is_some() || self.is_self(addr) - } - - /// Whether the provided address is ourselves. - pub fn is_self(&self, addr: SocketAddr) -> bool { - addr.ip() == self.config.host && addr.port() == self.config.port + self.get_peer(addr).is_some() } pub fn all_peers(&self) -> Vec> { - self.peers.read().unwrap().clone() + self.peers.read().unwrap().values().map(|p| p.clone()).collect() } /// Get a peer we're connected to by address. pub fn get_peer(&self, addr: SocketAddr) -> Option> { - for p in self.peers.read().unwrap().deref() { - if p.info.addr == addr { - return Some((*p).clone()); - } - } - None + self.peers.read().unwrap().get(&addr).map(|p| p.clone()) } /// Have the server iterate over its peer list and prune all peers we have /// lost connection to or have been deemed problematic. The removed peers /// are returned. pub fn clean_peers(&self) -> Vec> { - let mut peers = self.peers.write().unwrap(); + let mut rm = vec![]; - let (keep, rm) = peers.iter().fold((vec![], vec![]), |mut acc, ref p| { - if p.clone().is_connected() { - acc.0.push((*p).clone()); - } else { - acc.1.push((*p).clone()); + // build a list of peers to be cleaned up + for peer in self.all_peers() { + if !peer.is_connected() { + debug!(LOGGER, "cleaning {:?}, not connected", peer.info.addr); + rm.push(peer); } - acc - }); - *peers = keep; + } + + // now clean up peer map based on the list to remove + let mut peers = self.peers.write().unwrap(); + for p in rm.clone() { + peers.remove(&p.info.addr); + } + rm } /// Returns the peer with the most worked branch, showing the highest total /// difficulty. pub fn most_work_peer(&self) -> Option> { - let peers = self.peers.read().unwrap(); + let peers = self.all_peers(); if peers.len() == 0 { return None; } @@ -257,7 +251,7 @@ impl Server { /// Returns a random peer we're connected to. pub fn random_peer(&self) -> Option> { - let peers = self.peers.read().unwrap(); + let peers = self.all_peers(); if peers.len() == 0 { None } else { @@ -270,7 +264,7 @@ impl Server { /// may drop the broadcast request if it knows the remote peer already has /// the block. pub fn broadcast_block(&self, b: &core::Block) { - let peers = self.peers.write().unwrap(); + let peers = self.all_peers(); for p in peers.deref() { if p.is_connected() { if let Err(e) = p.send_block(b) { @@ -284,7 +278,7 @@ impl Server { /// implementation may drop the broadcast request if it knows the /// remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let peers = self.peers.write().unwrap(); + let peers = self.all_peers(); for p in peers.deref() { if p.is_connected() { if let Err(e) = p.send_transaction(tx) { @@ -301,7 +295,8 @@ impl Server { /// Stops the server. Disconnect from all peers at the same time. pub fn stop(self) { - let peers = self.peers.write().unwrap(); + info!(LOGGER, "calling stop on server"); + let peers = self.all_peers(); for p in peers.deref() { p.stop(); } @@ -311,7 +306,7 @@ impl Server { // Adds the peer built by the provided future in the peers map fn add_to_peers( - peers: Arc>>>, + peers: Arc>>>, adapter: Arc, peer_fut: A, ) -> Box), ()>, Error = Error>> @@ -320,9 +315,10 @@ where { let peer_add = peer_fut.into_future().map(move |(conn, peer)| { adapter.peer_connected(&peer.info); + let addr = peer.info.addr.clone(); let apeer = Arc::new(peer); let mut peers = peers.write().unwrap(); - peers.push(apeer.clone()); + peers.insert(addr, apeer.clone()); Ok((conn, apeer)) }); Box::new(peer_add) diff --git a/p2p/src/store.rs b/p2p/src/store.rs index c120f12d4..0b3aa0f11 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -21,6 +21,7 @@ use core::ser::{self, Readable, Reader, Writeable, Writer}; use grin_store::{self, option_to_not_found, to_key, Error}; use msg::SockAddr; use types::Capabilities; +use util::LOGGER; const STORE_SUBPATH: &'static str = "peers"; @@ -94,11 +95,8 @@ impl PeerStore { } pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { - // we want to ignore any peer without a well-defined ip - let ip = p.addr.ip(); - if ip.is_unspecified() || ip.is_loopback() { - return Ok(()); - } + debug!(LOGGER, "saving peer to store {:?}", p); + self.db.put_ser( &to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], p, diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 3c59dc0df..93f6ea500 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -59,7 +59,7 @@ fn peer_handshake() { p2p::UNKNOWN, Difficulty::one(), my_addr, - &p2p::handshake::Handshake::new(), + Arc::new(p2p::handshake::Handshake::new()), net_adapter.clone(), ) })