diff --git a/api/src/handlers.rs b/api/src/handlers.rs index 083b65024..870bee3db 100644 --- a/api/src/handlers.rs +++ b/api/src/handlers.rs @@ -229,12 +229,12 @@ impl Handler for SumTreeHandler { } pub struct PeersAllHandler { - pub peer_store: Arc<p2p::PeerStore>, + pub p2p_server: Arc<p2p::Server>, } impl Handler for PeersAllHandler { fn handle(&self, _req: &mut Request) -> IronResult<Response> { - let peers = &self.peer_store.all_peers(); + let peers = &self.p2p_server.all_peers(); json_response_pretty(&peers) } } @@ -246,7 +246,7 @@ pub struct PeersConnectedHandler { impl Handler for PeersConnectedHandler { fn handle(&self, _req: &mut Request) -> IronResult<Response> { let mut peers = vec![]; - for p in &self.p2p_server.all_peers() { + for p in &self.p2p_server.connected_peers() { let p = p.read().unwrap(); let peer_info = p.info.clone(); peers.push(peer_info); @@ -374,7 +374,6 @@ pub fn start_rest_apis<T>( chain: Arc<chain::Chain>, tx_pool: Arc<RwLock<pool::TransactionPool<T>>>, p2p_server: Arc<p2p::Server>, - peer_store: Arc<p2p::PeerStore>, ) where T: pool::BlockChain + Send + Sync + 'static, { @@ -396,7 +395,7 @@ pub fn start_rest_apis<T>( tx_pool: tx_pool.clone(), }; let peers_all_handler = PeersAllHandler { - peer_store: peer_store.clone(), + p2p_server: p2p_server.clone(), }; let peers_connected_handler = PeersConnectedHandler { p2p_server: p2p_server.clone(), diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 8a21d647c..27c3e6a2b 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -53,7 +53,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er debug!( LOGGER, - "Processing block {} at {} with {} inputs and {} outputs.", + "pipe: process_block {} at {} with {} inputs and {} outputs.", b.hash(), b.header.height, b.inputs.len(), @@ -78,9 +78,9 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er validate_block(b, &mut ctx, &mut extension)?; debug!( LOGGER, - "Block at {} with hash {} is valid, going to save and append.", + "pipe: proces_block {} at {} is valid, save and append.", + b.hash(), b.header.height, - b.hash() ); add_block(b, &mut ctx)?; @@ -96,7 +96,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er pub fn process_block_header(bh: &BlockHeader, mut ctx: BlockContext) -> Result<Option<Tip>, Error> { debug!( LOGGER, - "Processing header {} at {}.", + "pipe: process_header {} at {}", bh.hash(), bh.height ); @@ -119,7 +119,7 @@ fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> { } if let Ok(b) = ctx.store.get_block(&bh) { // there is a window where a block can be saved but the chain head not - // updated yet, we plug that window here by re-accepting the block + // updated yet, we plug that window here by re-accepting the block if b.header.total_difficulty <= ctx.head.total_difficulty { return Err(Error::Unfit("already in store".to_string())); } @@ -157,7 +157,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E if !ctx.opts.intersects(SKIP_POW) { let cycle_size = global::sizeshift(); - debug!(LOGGER, "Validating block with cuckoo size {}", cycle_size); + debug!(LOGGER, "pipe: validate_header cuckoo size {}", cycle_size); if !(ctx.pow_verifier)(header, cycle_size as u32) { return Err(Error::InvalidPow); } @@ -220,7 +220,7 @@ fn validate_block( ext.apply_block(b)?; } else { // extending a fork, first identify the block where forking occurred - // keeping the hashes of blocks along the fork + // keeping the hashes of blocks along the fork let mut current = b.header.previous; let mut hashes = vec![]; loop { @@ -290,7 +290,7 @@ fn validate_block( .get_block_header_by_output_commit(&input.commitment()) { // TODO - make sure we are not off-by-1 here vs. the equivalent tansaction - // validation rule + // validation rule if b.header.height <= output_header.height + global::coinbase_maturity() { return Err(Error::ImmatureCoinbase); } @@ -321,7 +321,7 @@ fn add_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Erro /// work than the head. fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> { // if we made a fork with more work than the head (which should also be true - // when extending the head), update it + // when extending the head), update it let tip = Tip::from_block(&b.header); if tip.total_difficulty > ctx.head.total_difficulty { // update the block height index @@ -330,8 +330,8 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> .map_err(|e| Error::StoreErr(e, "pipe setup height".to_owned()))?; // in sync mode, only update the "body chain", otherwise update both the - // "header chain" and "body chain", updating the header chain in sync resets - // all additional "future" headers we've received + // "header chain" and "body chain", updating the header chain in sync resets + // all additional "future" headers we've received if ctx.opts.intersects(SYNC) { ctx.store .save_body_head(&tip) @@ -359,7 +359,7 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> /// work than the head. fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> { // if we made a fork with more work than the head (which should also be true - // when extending the head), update it + // when extending the head), update it let tip = Tip::from_block(bh); if tip.total_difficulty > ctx.head.total_difficulty { ctx.store @@ -372,7 +372,7 @@ fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option "Updated block header head to {} at {}.", bh.hash(), bh.height - ); + ); Ok(Some(tip)) } else { Ok(None) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 258c96163..3a0d78dee 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -12,22 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; -use std::thread; +use std::sync::atomic::{AtomicBool, Ordering}; use chain::{self, ChainAdapter}; use core::core::{self, Output}; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use p2p::{self, NetAdapter, Peer, PeerData, PeerStore, Server, State}; +use p2p::{self, NetAdapter, PeerData, State}; use pool; use util::secp::pedersen::Commitment; use util::OneTime; use store; -use sync; use util::LOGGER; /// Implementation of the NetAdapter for the blockchain. Gets notified when new @@ -35,10 +33,9 @@ use util::LOGGER; /// implementations. pub struct NetToChainAdapter { chain: Arc<chain::Chain>, - peer_store: Arc<PeerStore>, - connected_peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>, + p2p_server: OneTime<Arc<p2p::Server>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, - syncer: OneTime<Arc<sync::Syncer>>, + syncing: AtomicBool, } impl NetAdapter for NetToChainAdapter { @@ -79,20 +76,14 @@ impl NetAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); } - - if self.syncing() { - // always notify the syncer we received a block - // otherwise we jam up the 8 download slots with orphans - debug!(LOGGER, "adapter: notifying syncer: received block {:?}", bhash); - self.syncer.borrow().block_received(bhash); - } } - fn headers_received(&self, bhs: Vec<core::BlockHeader>) { + fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) { info!( LOGGER, - "Received {} block headers", - bhs.len(), + "Received block headers {:?} from {}", + bhs.iter().map(|x| x.hash()).collect::<Vec<_>>(), + addr, ); // try to add each header to our header chain @@ -133,10 +124,6 @@ impl NetAdapter for NetToChainAdapter { "Added {} headers to the header chain.", added_hs.len() ); - - if self.syncing() { - self.syncer.borrow().headers_received(added_hs); - } } fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> { @@ -207,7 +194,7 @@ impl NetAdapter for NetToChainAdapter { /// Find good peers we know with the provided capability and return their /// addresses. fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> { - let peers = self.peer_store + let peers = self.p2p_server.borrow() .find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize); debug!(LOGGER, "Got {} peer addrs to send.", peers.len()); map_vec!(peers, |p| p.addr) @@ -217,7 +204,7 @@ impl NetAdapter for NetToChainAdapter { fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) { debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len()); for pa in peer_addrs { - if let Ok(e) = self.peer_store.exists_peer(pa) { + if let Ok(e) = self.p2p_server.borrow().exists_peer(pa) { if e { continue; } @@ -228,7 +215,7 @@ impl NetAdapter for NetToChainAdapter { user_agent: "".to_string(), flags: State::Healthy, }; - if let Err(e) = self.peer_store.save_peer(&peer) { + if let Err(e) = self.p2p_server.borrow().save_peer(&peer) { error!(LOGGER, "Could not save received peer address: {:?}", e); } } @@ -243,7 +230,7 @@ impl NetAdapter for NetToChainAdapter { user_agent: pi.user_agent.clone(), flags: State::Healthy, }; - if let Err(e) = self.peer_store.save_peer(&peer) { + if let Err(e) = self.p2p_server.borrow().save_peer(&peer) { error!(LOGGER, "Could not save connected peer: {:?}", e); } } @@ -258,8 +245,7 @@ impl NetAdapter for NetToChainAdapter { ); if diff.into_num() > 0 { - let peers = self.connected_peers.read().unwrap(); - if let Some(peer) = peers.get(&addr) { + if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) { let mut peer = peer.write().unwrap(); peer.info.total_difficulty = diff; } @@ -271,40 +257,54 @@ impl NetToChainAdapter { pub fn new( chain_ref: Arc<chain::Chain>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, - peer_store: Arc<PeerStore>, - connected_peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>, ) -> NetToChainAdapter { NetToChainAdapter { chain: chain_ref, - peer_store: peer_store, - connected_peers: connected_peers, + p2p_server: OneTime::new(), tx_pool: tx_pool, - syncer: OneTime::new(), + syncing: AtomicBool::new(true), } } - /// Start syncing the chain by instantiating and running the Syncer in the - /// background (a new thread is created). - pub fn start_sync(&self, sync: sync::Syncer) { - let arc_sync = Arc::new(sync); - self.syncer.init(arc_sync.clone()); - let _ = thread::Builder::new() - .name("syncer".to_string()) - .spawn(move || { - let res = arc_sync.run(); - if let Err(e) = res { - panic!("Error during sync, aborting: {:?}", e); - } - }); + /// Setup the p2p server on the adapter + pub fn init(&self, p2p: Arc<p2p::Server>) { + self.p2p_server.init(p2p); } - pub fn syncing(&self) -> bool { - self.syncer.is_initialized() && self.syncer.borrow().syncing() + /// Whether we're currently syncing the chain or we're fully caught up and + /// just receiving blocks through gossip. + pub fn is_syncing(&self) -> bool { + let local_diff = self.total_difficulty(); + let peers = self.p2p_server.borrow().connected_peers(); + + // if we're already syncing, we're caught up if no peer has a higher + // difficulty than us + if self.syncing.load(Ordering::Relaxed) { + let higher_diff = peers.iter().any(|p| { + let p = p.read().unwrap(); + p.info.total_difficulty > local_diff + }); + if !higher_diff { + info!(LOGGER, "sync: caught up on the most worked chain, disabling sync"); + self.syncing.store(false, Ordering::Relaxed); + } + } else { + // if we're not syncing, we need to if our difficulty is much too low + let higher_diff_padded = peers.iter().any(|p| { + let p = p.read().unwrap(); + p.info.total_difficulty > local_diff.clone() + Difficulty::from_num(1000) + }); + if higher_diff_padded { + info!(LOGGER, "sync: late on the most worked chain, enabling sync"); + self.syncing.store(true, Ordering::Relaxed); + } + } + self.syncing.load(Ordering::Relaxed) } /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { - let opts = if self.syncing() { + let opts = if self.is_syncing() { chain::SYNC } else { chain::NONE @@ -318,7 +318,7 @@ impl NetToChainAdapter { /// the network to broadcast the block pub struct ChainToPoolAndNetAdapter { tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, - p2p: OneTime<Arc<Server>>, + p2p: OneTime<Arc<p2p::Server>>, } impl ChainAdapter for ChainToPoolAndNetAdapter { @@ -346,7 +346,7 @@ impl ChainToPoolAndNetAdapter { p2p: OneTime::new(), } } - pub fn init(&self, p2p: Arc<Server>) { + pub fn init(&self, p2p: Arc<p2p::Server>) { self.p2p.init(p2p); } } @@ -354,7 +354,7 @@ impl ChainToPoolAndNetAdapter { /// Adapter between the transaction pool and the network, to relay /// transactions that have been accepted. pub struct PoolToNetAdapter { - p2p: OneTime<Arc<Server>>, + p2p: OneTime<Arc<p2p::Server>>, } impl pool::PoolAdapter for PoolToNetAdapter { @@ -372,7 +372,7 @@ impl PoolToNetAdapter { } /// Setup the p2p server on the adapter - pub fn init(&self, p2p: Arc<Server>) { + pub fn init(&self, p2p: Arc<p2p::Server>) { self.p2p.init(p2p); } } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index c6878d5cc..b9ba6581c 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -39,7 +39,6 @@ const PEER_PREFERRED_COUNT: u32 = 8; const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt"; pub struct Seeder { - peer_store: Arc<p2p::PeerStore>, p2p: Arc<p2p::Server>, capabilities: p2p::Capabilities, @@ -48,11 +47,9 @@ pub struct Seeder { impl Seeder { pub fn new( capabilities: p2p::Capabilities, - peer_store: Arc<p2p::PeerStore>, p2p: Arc<p2p::Server>, ) -> Seeder { Seeder { - peer_store: peer_store, p2p: p2p, capabilities: capabilities, } @@ -82,7 +79,6 @@ impl Seeder { &self, tx: mpsc::UnboundedSender<SocketAddr>, ) -> Box<Future<Item = (), Error = String>> { - let peer_store = self.peer_store.clone(); let p2p_server = self.p2p.clone(); // now spawn a new future to regularly check if we need to acquire more peers @@ -90,7 +86,7 @@ impl Seeder { let mon_loop = Timer::default() .interval(time::Duration::from_secs(10)) .for_each(move |_| { - debug!(LOGGER, "monitoring peers ({})", p2p_server.all_peers().len()); + debug!(LOGGER, "monitoring peers ({})", p2p_server.connected_peers().len()); // maintenance step first, clean up p2p server peers and mark bans // if needed @@ -100,7 +96,7 @@ impl Seeder { if p.is_banned() { debug!(LOGGER, "Marking peer {} as banned.", p.info.addr); let update_result = - peer_store.update_state(p.info.addr, p2p::State::Banned); + p2p_server.update_state(p.info.addr, p2p::State::Banned); match update_result { Ok(()) => {} Err(_) => {} @@ -110,12 +106,12 @@ impl Seeder { // we don't have enough peers, getting more from db if p2p_server.peer_count() < PEER_PREFERRED_COUNT { - let mut peers = peer_store.find_peers( + let mut peers = p2p_server.find_peers( p2p::State::Healthy, p2p::UNKNOWN, (2 * PEER_MAX_COUNT) as usize, ); - peers.retain(|p| !p2p_server.is_known(p.addr)); + peers.retain(|p| !p2p_server.is_known(&p.addr)); if peers.len() > 0 { debug!( LOGGER, @@ -137,21 +133,21 @@ impl Seeder { } // Check if we have any pre-existing peer in db. If so, start with those, - // otherwise use the seeds provided. + // otherwise use the seeds provided. fn connect_to_seeds( &self, tx: mpsc::UnboundedSender<SocketAddr>, seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>, ) -> Box<Future<Item = (), Error = String>> { - let peer_store = self.peer_store.clone(); // a thread pool is required so we don't block the event loop with a - // db query + // db query let thread_pool = cpupool::CpuPool::new(1); + let p2p_server = self.p2p.clone(); let seeder = thread_pool .spawn_fn(move || { // check if we have some peers in db - let peers = peer_store.find_peers( + let peers = p2p_server.find_peers( p2p::State::Healthy, p2p::FULL_HIST, (2 * PEER_MAX_COUNT) as usize, @@ -192,7 +188,6 @@ impl Seeder { rx: mpsc::UnboundedReceiver<SocketAddr>, ) -> Box<Future<Item = (), Error = ()>> { let capab = self.capabilities; - let p2p_store = self.peer_store.clone(); let p2p_server = self.p2p.clone(); let listener = rx.for_each(move |peer_addr| { @@ -202,7 +197,6 @@ impl Seeder { h.spawn( connect_and_req( capab, - p2p_store.clone(), p2p_server.clone(), inner_h, peer_addr, @@ -271,7 +265,6 @@ pub fn predefined_seeds( fn connect_and_req( capab: p2p::Capabilities, - peer_store: Arc<p2p::PeerStore>, p2p: Arc<p2p::Server>, h: reactor::Handle, addr: SocketAddr, @@ -279,6 +272,7 @@ fn connect_and_req( let connect_peer = p2p.connect_peer(addr, h).map_err(|_| ()); let timer = Timer::default(); let timeout = timer.timeout(connect_peer, Duration::from_secs(5)); + let p2p_server = p2p.clone(); let fut = timeout.then(move |p| { match p { @@ -291,7 +285,7 @@ fn connect_and_req( } } Err(_) => { - let update_result = peer_store.update_state(addr, p2p::State::Defunct); + let update_result = p2p_server.update_state(addr, p2p::State::Defunct); match update_result { Ok(()) => {} Err(_) => {} diff --git a/grin/src/server.rs b/grin/src/server.rs index 52fb9b449..61e9660a9 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -16,7 +16,6 @@ //! the peer-to-peer server, the blockchain and the transaction pool) and acts //! as a facade. -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::thread; @@ -109,29 +108,23 @@ impl Server { pool_adapter.set_chain(shared_chain.clone()); - // Currently connected peers. Used by both the net_adapter and the p2p_server. - let connected_peers = Arc::new(RwLock::new(HashMap::new())); - - let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); let net_adapter = Arc::new(NetToChainAdapter::new( shared_chain.clone(), tx_pool.clone(), - peer_store.clone(), - connected_peers.clone(), )); let p2p_server = Arc::new(p2p::Server::new( + config.db_root.clone(), config.capabilities, config.p2p_config.unwrap(), - connected_peers.clone(), net_adapter.clone(), genesis.hash(), - )); - + )?); chain_adapter.init(p2p_server.clone()); pool_net_adapter.init(p2p_server.clone()); + net_adapter.init(p2p_server.clone()); - let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone()); + let seed = seed::Seeder::new(config.capabilities, p2p_server.clone()); match config.seeding_type.clone() { Seeding::None => { warn!( @@ -158,11 +151,11 @@ impl Server { _ => {} } - // If we have any known seeds or peers then attempt to sync. - if config.seeding_type != Seeding::None || peer_store.all_peers().len() > 0 { - let sync = sync::Syncer::new(shared_chain.clone(), p2p_server.clone()); - net_adapter.start_sync(sync); - } + sync::run_sync( + net_adapter.clone(), + p2p_server.clone(), + shared_chain.clone(), + ); evt_handle.spawn(p2p_server.start(evt_handle.clone()).map_err(|_| ())); @@ -173,7 +166,6 @@ impl Server { shared_chain.clone(), tx_pool.clone(), p2p_server.clone(), - peer_store.clone(), ); warn!(LOGGER, "Grin server started."); @@ -214,8 +206,10 @@ impl Server { let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port)); thread::spawn(move || { + // TODO push this down in the run loop so miner gets paused anytime we + // decide to sync again let secs_5 = time::Duration::from_secs(5); - while net_adapter.syncing() { + while net_adapter.is_syncing() { thread::sleep(secs_5); } miner.run_loop(config.clone(), cuckoo_size as u32, proof_size); diff --git a/grin/src/sync.rs b/grin/src/sync.rs index ea75a4710..380b39512 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -1,4 +1,4 @@ -// Copyright 2016 The Grin Developers +// Copyright 2017 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,294 +12,182 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Synchronization of the local blockchain with the rest of the network. Used -//! either on a brand new node or when a node is late based on others' heads. -//! Always starts by downloading the header chain before asking either for full -//! blocks or a full UTXO set with related information. - -/// How many block bodies to download in parallel -const MAX_BODY_DOWNLOADS: usize = 8; - -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex}; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::sync::{Arc, RwLock}; -use core::core::hash::{Hash, Hashed}; +use adapters::NetToChainAdapter; use chain; -use p2p; +use core::core::hash::{Hash, Hashed}; +use p2p::{self, Peer, NetAdapter}; use types::Error; use util::LOGGER; -#[derive(Debug)] -struct BlockDownload { - hash: Hash, - start_time: Instant, - retries: u8, -} - -/// Manages syncing the local chain with other peers. Needs both a head chain -/// and a full block chain to operate. First tries to advance the header -/// chain as much as possible, then downloads the full blocks by batches. -pub struct Syncer { +/// Starts the syncing loop, just spawns two threads that loop forever +pub fn run_sync( + adapter: Arc<NetToChainAdapter>, + p2p_server: Arc<p2p::Server>, chain: Arc<chain::Chain>, - p2p: Arc<p2p::Server>, - - sync: Mutex<bool>, - last_header_req: Mutex<Instant>, - blocks_to_download: Mutex<Vec<Hash>>, - blocks_downloading: Mutex<Vec<BlockDownload>>, -} - -impl Syncer { - pub fn new(chain_ref: Arc<chain::Chain>, p2p: Arc<p2p::Server>) -> Syncer { - Syncer { - chain: chain_ref, - p2p: p2p, - sync: Mutex::new(true), - last_header_req: Mutex::new(Instant::now() - Duration::from_secs(2)), - blocks_to_download: Mutex::new(vec![]), - blocks_downloading: Mutex::new(vec![]), - } - } - - pub fn syncing(&self) -> bool { - *self.sync.lock().unwrap() - } - - /// Checks the local chain state, comparing it with our peers and triggers - /// syncing if required. - pub fn run(&self) -> Result<(), Error> { - info!(LOGGER, "Sync: starting sync"); - - // Loop for 10s waiting for some peers to potentially sync from. - let start = Instant::now(); - loop { - let pc = self.p2p.peer_count(); - if pc > 3 { - break; - } - if Instant::now() - start > Duration::from_secs(10) { - break; - } - thread::sleep(Duration::from_millis(200)); - } - - // Now check we actually have at least one peer to sync from. - // If not then end the sync cleanly. - if self.p2p.peer_count() == 0 { - info!(LOGGER, "Sync: no peers to sync with, done."); - - let mut sync = self.sync.lock().unwrap(); - *sync = false; - - return Ok(()) - } - - // check if we have missing full blocks for which we already have a header - self.init_download()?; - - // main syncing loop, requests more headers and bodies periodically as long - // as a peer with higher difficulty exists and we're not fully caught up - info!(LOGGER, "Sync: Starting loop."); - loop { - let tip = self.chain.get_header_head()?; - - // TODO do something better (like trying to get more) if we lose peers - let peer = self.p2p.most_work_peer().expect("No peers available for sync."); - let peer = peer.read().unwrap(); - debug!( - LOGGER, - "Sync: peer {} vs us {}", - peer.info.total_difficulty, - tip.total_difficulty - ); - - let more_headers = peer.info.total_difficulty > tip.total_difficulty; - let more_bodies = { - let blocks_to_download = self.blocks_to_download.lock().unwrap(); - let blocks_downloading = self.blocks_downloading.lock().unwrap(); - debug!( - LOGGER, - "Sync: blocks to download {}, block downloading {}", - blocks_to_download.len(), - blocks_downloading.len(), - ); - blocks_to_download.len() > 0 || blocks_downloading.len() > 0 - }; - - { - let last_header_req = self.last_header_req.lock().unwrap().clone(); - if more_headers || (Instant::now() - Duration::from_secs(30) > last_header_req) { - self.request_headers()?; +) { + let a_inner = adapter.clone(); + let p2p_inner = p2p_server.clone(); + let c_inner = chain.clone(); + let _ = thread::Builder::new() + .name("body_sync".to_string()) + .spawn(move || { + loop { + if a_inner.is_syncing() { + body_sync(p2p_inner.clone(), c_inner.clone()); + } else { + thread::sleep(Duration::from_secs(5)); } } - if more_bodies { - self.request_bodies(); + }); + let _ = thread::Builder::new() + .name("header_sync".to_string()) + .spawn(move || { + loop { + if adapter.is_syncing() { + header_sync(adapter.clone(), p2p_server.clone(), chain.clone()); + } else { + thread::sleep(Duration::from_secs(5)); + } } - if !more_headers && !more_bodies { - // TODO check we haven't been lied to on the total work - let mut sync = self.sync.lock().unwrap(); - *sync = false; + }); +} + +fn body_sync( + p2p_server: Arc<p2p::Server>, + chain: Arc<chain::Chain>, +) { + debug!(LOGGER, "block_sync: loop"); + + let header_head = chain.get_header_head().unwrap(); + let block_header = chain.head_header().unwrap(); + let mut hashes = vec![]; + + if header_head.total_difficulty > block_header.total_difficulty { + let mut current = chain.get_block_header(&header_head.last_block_h); + while let Ok(header) = current { + if header.hash() == block_header.hash() { break; } - - thread::sleep(Duration::from_secs(2)); + hashes.push(header.hash()); + current = chain.get_block_header(&header.previous); } - info!(LOGGER, "Sync: done."); - Ok(()) } + hashes.reverse(); - /// Checks the gap between the header chain and the full block chain and - /// initializes the blocks_to_download structure with the missing full - /// blocks - fn init_download(&self) -> Result<(), Error> { - // compare the header's head to the full one to see what we're missing - let header_head = self.chain.get_header_head()?; - let full_head = self.chain.head()?; - let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); - - // go back the chain and insert for download all blocks we only have the - // head for - let mut prev_h = header_head.last_block_h; - while prev_h != full_head.last_block_h { - let header = self.chain.get_block_header(&prev_h)?; - if header.height < full_head.height { - break; - } - blocks_to_download.push(header.hash()); - prev_h = header.previous; - } + let hashes_to_get = hashes + .iter() + .filter(|x| !chain.get_block(&x).is_ok()) + .take(10) + .cloned() + .collect::<Vec<_>>(); + if hashes_to_get.len() > 0 { debug!( LOGGER, - "Sync: Added {} full block hashes to download.", - blocks_to_download.len() - ); - Ok(()) - } + "block_sync: requesting blocks ({}/{}), {:?}", + block_header.height, + header_head.height, + hashes_to_get, + ); - /// Asks for the blocks we haven't downloaded yet and place them in the - /// downloading structure. - fn request_bodies(&self) { - let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); - let mut blocks_downloading = self.blocks_downloading.lock().unwrap(); - - // retry blocks not downloading - let now = Instant::now(); - for download in blocks_downloading.deref_mut() { - let elapsed = (now - download.start_time).as_secs(); - if download.retries >= 12 { - panic!("Failed to download required block {}", download.hash); - } - if download.retries < (elapsed / 5) as u8 { - debug!( - LOGGER, - "Sync: Retry {} on block {}", - download.retries, - download.hash - ); - self.request_block(download.hash); - download.retries += 1; + for hash in hashes_to_get.clone() { + let peer = if hashes_to_get.len() < 100 { + p2p_server.most_work_peer() + } else { + p2p_server.random_peer() + }; + if let Some(peer) = peer { + let peer = peer.read().unwrap(); + if let Err(e) = peer.send_block_request(hash) { + debug!(LOGGER, "block_sync: error requesting block: {:?}, {:?}", hash, e); + } } } - - // consume hashes from blocks to download, place them in downloading and - // request them from the network - let mut count = 0; - while blocks_to_download.len() > 0 && blocks_downloading.len() < MAX_BODY_DOWNLOADS { - let h = blocks_to_download.pop().unwrap(); - self.request_block(h); - count += 1; - blocks_downloading.push(BlockDownload { - hash: h, - start_time: Instant::now(), - retries: 0, - }); - } - debug!( - LOGGER, - "Sync: Requested {} full blocks to download, total left: {}. Current list: {:?}.", - count, - blocks_to_download.len(), - blocks_downloading.deref(), - ); + thread::sleep(Duration::from_secs(1)); + } else { + thread::sleep(Duration::from_secs(5)); } +} - /// We added a block, clean up the downloading structure - pub fn block_received(&self, bh: Hash) { - // just clean up the downloading list - let mut bds = self.blocks_downloading.lock().unwrap(); - bds.iter() - .position(|ref h| h.hash == bh) - .map(|n| bds.remove(n)); - } +pub fn header_sync( + adapter: Arc<NetToChainAdapter>, + p2p_server: Arc<p2p::Server>, + chain: Arc<chain::Chain>, + ) { + debug!(LOGGER, "header_sync: loop"); - /// Request some block headers from a peer to advance us - fn request_headers(&self) -> Result<(), Error> { - { - let mut last_header_req = self.last_header_req.lock().unwrap(); - *last_header_req = Instant::now(); - } + let difficulty = adapter.total_difficulty(); - let tip = self.chain.get_header_head()?; - let peer = self.p2p.most_work_peer(); - let locator = self.get_locator(&tip)?; - if let Some(p) = peer { - let p = p.read().unwrap(); + if let Some(peer) = p2p_server.most_work_peer() { + let peer = peer.clone(); + let p = peer.read().unwrap(); + let peer_difficulty = p.info.total_difficulty.clone(); + + if peer_difficulty > difficulty { debug!( LOGGER, - "Sync: Asking peer {} for more block headers, locator: {:?}", - p.info.addr, - locator, - ); - if let Err(e) = p.send_header_request(locator) { - debug!(LOGGER, "Sync: peer error, will retry"); - } - } else { - warn!(LOGGER, "Sync: Could not get most worked peer to request headers."); - } - Ok(()) - } + "header_sync: difficulty {} vs {}", + peer_difficulty, + difficulty, + ); - /// We added a header, add it to the full block download list - pub fn headers_received(&self, bhs: Vec<Hash>) { - let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); - for h in bhs { - // enlist for full block download - blocks_to_download.insert(0, h); - } - - // we may still have more headers to retrieve but the main loop - // will take care of this for us - } - - /// Builds a vector of block hashes that should help the remote peer sending - /// us the right block headers. - fn get_locator(&self, tip: &chain::Tip) -> Result<Vec<Hash>, Error> { - let heights = get_locator_heights(tip.height); - - debug!(LOGGER, "Sync: locator heights: {:?}", heights); - - let locator = heights - .into_iter() - .map(|h| self.chain.get_header_by_height(h)) - .filter(|h| h.is_ok()) - .map(|h| h.unwrap().hash()) - .collect(); - debug!(LOGGER, "Sync: locator: {:?}", locator); - Ok(locator) - } - - /// Pick a random peer and ask for a block by hash - fn request_block(&self, h: Hash) { - if let Some(peer) = self.p2p.random_peer() { - let peer = peer.read().unwrap(); - if let Err(e) = peer.send_block_request(h) { - debug!(LOGGER, "Sync: Error requesting block: {:?}", e); - } + let _ = request_headers( + peer.clone(), + chain.clone(), + ); } } + + thread::sleep(Duration::from_secs(30)); +} + +/// Request some block headers from a peer to advance us +fn request_headers( + peer: Arc<RwLock<Peer>>, + chain: Arc<chain::Chain>, +) -> Result<(), Error> { + let locator = get_locator(chain)?; + let peer = peer.read().unwrap(); + debug!( + LOGGER, + "Sync: Asking peer {} for more block headers, locator: {:?}", + peer.info.addr, + locator, + ); + let _ = peer.send_header_request(locator); + Ok(()) +} + +fn get_locator(chain: Arc<chain::Chain>) -> Result<Vec<Hash>, Error> { + let tip = chain.get_header_head()?; + + // TODO - is this necessary? + // go back to earlier header height to ensure we do not miss a header + let height = if tip.height > 5 { + tip.height - 5 + } else { + 0 + }; + let heights = get_locator_heights(height); + + debug!(LOGGER, "Sync: locator heights: {:?}", heights); + + let mut locator = vec![]; + let mut current = chain.get_block_header(&tip.last_block_h); + while let Ok(header) = current { + if heights.contains(&header.height) { + locator.push(header.hash()); + } + current = chain.get_block_header(&header.previous); + } + + debug!(LOGGER, "Sync: locator: {:?}", locator); + + Ok(locator) } // current height back to 0 decreasing in powers of 2 diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index dd2552e5f..cf23fc837 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -55,5 +55,5 @@ mod types; pub use server::{DummyAdapter, Server}; pub use peer::Peer; pub use types::{Capabilities, Error, NetAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE, - MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN}; -pub use store::{PeerData, PeerStore, State}; + MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN}; +pub use store::{PeerData, State}; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 3e0449ba9..c54fac1fa 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -229,8 +229,8 @@ impl NetAdapter for TrackingAdapter { self.adapter.block_received(b) } - fn headers_received(&self, bh: Vec<core::BlockHeader>) { - self.adapter.headers_received(bh) + fn headers_received(&self, bh: Vec<core::BlockHeader>, addr: SocketAddr) { + self.adapter.headers_received(bh, addr) } fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 46a9142d3..f6f192891 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -211,7 +211,7 @@ fn handle_payload( } Type::Headers => { let headers = ser::deserialize::<Headers>(&mut &buf[..])?; - adapter.headers_received(headers.headers); + adapter.headers_received(headers.headers, addr); Ok(None) } Type::GetPeerAddrs => { diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 3c20bfe16..9ffcaa1a4 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -34,6 +34,7 @@ use core::core::hash::Hash; use core::core::target::Difficulty; use handshake::Handshake; use peer::Peer; +use store::{PeerStore, PeerData, State}; use types::*; use util::LOGGER; @@ -45,7 +46,7 @@ impl NetAdapter for DummyAdapter { } fn transaction_received(&self, _: core::Transaction) {} fn block_received(&self, _: core::Block) {} - fn headers_received(&self, _: Vec<core::BlockHeader>) {} + fn headers_received(&self, _: Vec<core::BlockHeader>, _: SocketAddr) {} fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> { vec![] } @@ -65,6 +66,7 @@ impl NetAdapter for DummyAdapter { pub struct Server { config: P2PConfig, capabilities: Capabilities, + store: Arc<PeerStore>, peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>, handshake: Arc<Handshake>, adapter: Arc<NetAdapter>, @@ -78,20 +80,21 @@ unsafe impl Send for Server {} impl Server { /// Creates a new idle p2p server with no peers pub fn new( + db_root: String, capab: Capabilities, config: P2PConfig, - peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>, adapter: Arc<NetAdapter>, genesis: Hash, - ) -> Server { - Server { + ) -> Result<Server, Error> { + Ok(Server { config: config, capabilities: capab, - peers: peers, + store: Arc::new(PeerStore::new(db_root)?), + peers: Arc::new(RwLock::new(HashMap::new())), handshake: Arc::new(Handshake::new(genesis)), adapter: adapter, stop: RefCell::new(None), - } + }) } /// Starts the p2p server. Opens a TCP port to allow incoming @@ -185,7 +188,7 @@ impl Server { addr: SocketAddr, h: reactor::Handle, ) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> { - if let Some(p) = self.get_peer(addr) { + if let Some(p) = self.get_peer(&addr) { // if we're already connected to the addr, just return the peer return Box::new(future::ok(Some(p))); } @@ -229,17 +232,17 @@ impl Server { } /// Check if the server already knows this peer (is already connected). - pub fn is_known(&self, addr: SocketAddr) -> bool { + pub fn is_known(&self, addr: &SocketAddr) -> bool { self.get_peer(addr).is_some() } - pub fn all_peers(&self) -> Vec<Arc<RwLock<Peer>>> { + pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> { self.peers.read().unwrap().values().map(|p| p.clone()).collect() } /// Get a peer we're connected to by address. - pub fn get_peer(&self, addr: SocketAddr) -> Option<Arc<RwLock<Peer>>> { - self.peers.read().unwrap().get(&addr).map(|p| p.clone()) + pub fn get_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> { + self.peers.read().unwrap().get(addr).map(|p| p.clone()) } /// Have the server iterate over its peer list and prune all peers we have @@ -249,7 +252,7 @@ impl Server { let mut rm = vec![]; // build a list of peers to be cleaned up - for peer in self.all_peers() { + for peer in self.connected_peers() { let peer_inner = peer.read().unwrap(); if !peer_inner.is_connected() { debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); @@ -270,7 +273,7 @@ impl Server { /// Returns the peer with the most worked branch, showing the highest total /// difficulty. pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> { - let mut peers = self.all_peers(); + let mut peers = self.connected_peers(); if peers.len() == 0 { return None; } @@ -293,7 +296,7 @@ impl Server { let difficulty = self.adapter.total_difficulty(); let peers = self - .all_peers() + .connected_peers() .iter() .filter(|x| { let peer = x.read().unwrap(); @@ -312,7 +315,7 @@ impl Server { /// may drop the broadcast request if it knows the remote peer already has /// the block. pub fn broadcast_block(&self, b: &core::Block) { - let peers = self.all_peers(); + let peers = self.connected_peers(); let mut count = 0; for p in peers { let p = p.read().unwrap(); @@ -331,7 +334,7 @@ impl Server { /// implementation may drop the broadcast request if it knows the /// remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let peers = self.all_peers(); + let peers = self.connected_peers(); for p in peers { let p = p.read().unwrap(); if p.is_connected() { @@ -344,19 +347,44 @@ impl Server { /// Number of peers we're currently connected to. pub fn peer_count(&self) -> u32 { - self.all_peers().len() as u32 + self.connected_peers().len() as u32 } /// Stops the server. Disconnect from all peers at the same time. pub fn stop(self) { info!(LOGGER, "calling stop on server"); - let peers = self.all_peers(); + let peers = self.connected_peers(); for peer in peers { let peer = peer.read().unwrap(); peer.stop(); } self.stop.into_inner().unwrap().send(()).unwrap(); } + + /// All peer information we have in storage + pub fn all_peers(&self) -> Vec<PeerData> { + self.store.all_peers() + } + + /// Find peers in store (not necessarily connected) and return their data + pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> { + self.store.find_peers(state, cap, count) + } + + /// Whether we've already seen a peer with the provided address + pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> { + self.store.exists_peer(peer_addr).map_err(From::from) + } + + /// Saves updated information about a peer + pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { + self.store.save_peer(p).map_err(From::from) + } + + /// Updates the state of a peer in store + pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { + self.store.update_state(peer_addr, new_state).map_err(From::from) + } } // Adds the peer built by the provided future in the peers map diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 62fc5cf7e..78d72c7b9 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -25,6 +25,7 @@ use core::core; use core::core::hash::Hash; use core::core::target::Difficulty; use core::ser; +use grin_store; /// Maximum number of block headers a peer should ever send pub const MAX_BLOCK_HEADERS: u32 = 512; @@ -42,6 +43,7 @@ pub enum Error { Connection(io::Error), ConnectionClose, Timeout, + Store(grin_store::Error), PeerWithSelf, ProtocolMismatch { us: u32, @@ -58,6 +60,11 @@ impl From<ser::Error> for Error { Error::Serialization(e) } } +impl From<grin_store::Error> for Error { + fn from(e: grin_store::Error) -> Error { + Error::Store(e) + } +} impl From<io::Error> for Error { fn from(e: io::Error) -> Error { Error::Connection(e) @@ -167,7 +174,7 @@ pub trait NetAdapter: Sync + Send { /// A set of block header has been received, typically in response to a /// block /// header request. - fn headers_received(&self, bh: Vec<core::BlockHeader>); + fn headers_received(&self, bh: Vec<core::BlockHeader>, addr: SocketAddr); /// Finds a list of block headers based on the provided locator. Tries to /// identify the common chain and gets the headers that follow it diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 6ffe43fa8..5756761cd 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -17,9 +17,8 @@ extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate tokio_core; -use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time; use futures::future::Future; @@ -38,14 +37,13 @@ fn peer_handshake() { let handle = evtlp.handle(); let p2p_conf = p2p::P2PConfig::default(); let net_adapter = Arc::new(p2p::DummyAdapter {}); - let connected_peers = Arc::new(RwLock::new(HashMap::new())); let server = p2p::Server::new( + ".grin".to_owned(), p2p::UNKNOWN, p2p_conf, - connected_peers, net_adapter.clone(), Hash::from_vec(vec![]), - ); + ).unwrap(); let run_server = server.start(handle.clone()); let my_addr = "127.0.0.1:5000".parse().unwrap();