Variety of small p2p fixes to block sync

Adds a couple utility function to p2p server to check whether
we're already connected to a peer before trying again. Other
very minor fixes and logging improvements.
This commit is contained in:
Ignotus Peverell 2017-04-27 22:05:12 -07:00
parent a7089d1975
commit 1b78a73328
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
6 changed files with 65 additions and 23 deletions

View file

@ -94,10 +94,11 @@ impl NetAdapter for NetToChainAdapter {
Ok(_) => { Ok(_) => {
added_hs.push(bh.hash()); added_hs.push(bh.hash());
} }
Err(chain::Error::Unfit(_)) => { Err(chain::Error::Unfit(s)) => {
info!("Received unfit block header {} at {}.", info!("Received unfit block header {} at {}: {}.",
bh.hash(), bh.hash(),
bh.height); bh.height,
s);
} }
Err(chain::Error::StoreErr(e)) => { Err(chain::Error::StoreErr(e)) => {
error!("Store error processing block header {}: {:?}", bh.hash(), e); error!("Store error processing block header {}: {:?}", bh.hash(), e);

View file

@ -98,6 +98,8 @@ impl Seeder {
let mut peers = peer_store.find_peers(p2p::State::Healthy, let mut peers = peer_store.find_peers(p2p::State::Healthy,
p2p::UNKNOWN, p2p::UNKNOWN,
(2 * PEER_MAX_COUNT) as usize); (2 * PEER_MAX_COUNT) as usize);
peers.retain(|p| !p2p_server.is_known(p.addr));
if peers.len() > 0 {
debug!("Got {} more peers from db, trying to connect.", peers.len()); debug!("Got {} more peers from db, trying to connect.", peers.len());
thread_rng().shuffle(&mut peers[..]); thread_rng().shuffle(&mut peers[..]);
let sz = min(PEER_PREFERRED_COUNT as usize, peers.len()); let sz = min(PEER_PREFERRED_COUNT as usize, peers.len());
@ -105,9 +107,11 @@ impl Seeder {
tx.send(p.addr).unwrap(); tx.send(p.addr).unwrap();
} }
} }
}
Ok(()) Ok(())
}) })
.map_err(|e| e.to_string()); .map_err(|e| e.to_string());
Box::new(mon_loop) Box::new(mon_loop)
} }
@ -223,15 +227,17 @@ fn connect_and_req(capab: p2p::Capabilities,
addr: SocketAddr) addr: SocketAddr)
-> Box<Future<Item = (), Error = ()>> { -> Box<Future<Item = (), Error = ()>> {
let fut = p2p.connect_peer(addr, h) let fut = p2p.connect_peer(addr, h)
.and_then(move |p| { .then(move |p| {
if let Some(p) = p { match p {
Ok(Some(p)) => {
p.send_peer_request(capab); p.send_peer_request(capab);
} }
Err(e) => {
error!("Peer request error: {:?}", e);
}
_ => {}
}
Ok(()) Ok(())
})
.map_err(|e| {
error!("Peer request error {:?}", e);
()
}); });
Box::new(fut) Box::new(fut)
} }

View file

@ -30,6 +30,7 @@ use api;
use chain; use chain;
use chain::ChainStore; use chain::ChainStore;
use core; use core;
use core::core::hash::Hashed;
use miner; use miner;
use p2p; use p2p;
use seed; use seed;
@ -174,5 +175,14 @@ fn store_head(config: &ServerConfig)
} }
Err(e) => return Err(Error::Store(e)), Err(e) => return Err(Error::Store(e)),
}; };
let head = chain_store.head()?;
let head_header = chain_store.head_header()?;
info!("Starting server with head {} at {} and header head {} at {}",
head.last_block_h,
head.height,
head_header.hash(),
head_header.height);
Ok((Arc::new(chain_store), head)) Ok((Arc::new(chain_store), head))
} }

View file

@ -66,7 +66,7 @@ impl Syncer {
if pc > 3 { if pc > 3 {
break; break;
} }
if pc > 0 && (Instant::now() - start > Duration::from_secs(15)) { if pc > 0 && (Instant::now() - start > Duration::from_secs(10)) {
break; break;
} }
thread::sleep(Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
@ -178,7 +178,10 @@ impl Syncer {
let peer = self.p2p.most_work_peer(); let peer = self.p2p.most_work_peer();
let locator = self.get_locator(&tip)?; let locator = self.get_locator(&tip)?;
if let Some(p) = peer { if let Some(p) = peer {
debug!("Asking peer {} for more block headers.", p.info.addr); debug!("Asking peer {} for more block headers starting from {} at {}.",
p.info.addr,
tip.last_block_h,
tip.height);
p.send_header_request(locator)?; p.send_header_request(locator)?;
} else { } else {
warn!("Could not get most worked peer to request headers."); warn!("Could not get most worked peer to request headers.");

View file

@ -145,16 +145,16 @@ impl Server {
addr: SocketAddr, addr: SocketAddr,
h: reactor::Handle) h: reactor::Handle)
-> Box<Future<Item = Option<Arc<Peer>>, Error = Error>> { -> Box<Future<Item = Option<Arc<Peer>>, Error = Error>> {
for p in self.peers.read().unwrap().deref() { if let Some(p) = self.get_peer(addr) {
// if we're already connected to the addr, just return the peer // if we're already connected to the addr, just return the peer
if p.info.addr == addr { return Box::new(future::ok(Some(p)));
return Box::new(future::ok(Some((*p).clone())));
}
} }
if self.is_self(addr) {
// asked to connect to ourselves // asked to connect to ourselves
if addr.ip() == self.config.host && addr.port() == self.config.port {
return Box::new(future::ok(None)); return Box::new(future::ok(None));
} }
// cloneapalooza
let peers = self.peers.clone(); let peers = self.peers.clone();
let adapter1 = self.adapter.clone(); let adapter1 = self.adapter.clone();
let adapter2 = self.adapter.clone(); let adapter2 = self.adapter.clone();
@ -186,6 +186,27 @@ impl Server {
Box::new(request) Box::new(request)
} }
/// Check if the server already knows this peer (is already connected). In
/// addition we consider to know ourselves.
pub fn is_known(&self, addr: SocketAddr) -> bool {
self.get_peer(addr).is_some() || self.is_self(addr)
}
/// Whether the provided address is ourselves.
pub fn is_self(&self, addr: SocketAddr) -> bool {
addr.ip() == self.config.host && addr.port() == self.config.port
}
/// Get a peer we're connected to by address.
pub fn get_peer(&self, addr: SocketAddr) -> Option<Arc<Peer>> {
for p in self.peers.read().unwrap().deref() {
if p.info.addr == addr {
return Some((*p).clone());
}
}
None
}
/// Have the server iterate over its peer list and prune all peers we have /// Have the server iterate over its peer list and prune all peers we have
/// lost connection to or have been deemed problematic. The removed peers /// lost connection to or have been deemed problematic. The removed peers
/// are returned. /// are returned.

View file

@ -37,6 +37,7 @@ enum_from_primitive! {
} }
/// Data stored for any given peer we've encountered. /// Data stored for any given peer we've encountered.
#[derive(Debug)]
pub struct PeerData { pub struct PeerData {
/// Network address of the peer. /// Network address of the peer.
pub addr: SocketAddr, pub addr: SocketAddr,