more robust peer handling (#244)

* use HashMap internally for tracking connected peers (to avoid duplicates)
* reuse the handshake on the server so we can track our own nonce to avoid self connections correctly
* make sure we start up the clean_peers loop even in seedless mode
* logging in monitoring peers loop
* simplify monitoring for no seeds
* fixup and cleanup simulnet tests (real seeds in places)
* only start the sync if we have either seeds or peers that we know about, exit syncer safely if we have no connected peers
This commit is contained in:
AntiochP 2017-11-14 13:57:16 -05:00 committed by Ignotus Peverell
parent 855602e98a
commit c2a95637b3
11 changed files with 135 additions and 81 deletions

View file

@ -89,6 +89,12 @@ impl NetAdapter for NetToChainAdapter {
}
fn headers_received(&self, bhs: Vec<core::BlockHeader>) {
info!(
LOGGER,
"Received {} block headers",
bhs.len(),
);
// try to add each header to our header chain
let mut added_hs = vec![];
for bh in bhs {
@ -134,11 +140,19 @@ impl NetAdapter for NetToChainAdapter {
}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {
debug!(
LOGGER,
"locate_headers: {:?}",
locator,
);
if locator.len() == 0 {
return vec![];
}
// go through the locator vector and check if we know any of these headers
// recursively go back through the locator vector
// and stop when we find a header that we recognize
// this will be a header shared in common between us and the peer
let known = self.chain.get_block_header(&locator[0]);
let header = match known {
Ok(header) => header,
@ -151,6 +165,12 @@ impl NetAdapter for NetToChainAdapter {
}
};
debug!(
LOGGER,
"locate_headers: {:?}",
header,
);
// looks like we know one, getting as many following headers as allowed
let hh = header.height;
let mut headers = vec![];
@ -165,6 +185,13 @@ impl NetAdapter for NetToChainAdapter {
}
}
}
debug!(
LOGGER,
"locate_headers: returning headers: {}",
headers.len(),
);
headers
}

View file

@ -63,7 +63,7 @@ impl Seeder {
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
) {
// open a channel with a listener that connects every peer address sent below
// max peer count
// max peer count
let (tx, rx) = futures::sync::mpsc::unbounded();
h.spawn(self.listen_for_addrs(h.clone(), rx));
@ -85,12 +85,14 @@ impl Seeder {
let p2p_server = self.p2p.clone();
// now spawn a new future to regularly check if we need to acquire more peers
// and if so, gets them from db
// and if so, gets them from db
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(10))
.for_each(move |_| {
debug!(LOGGER, "monitoring peers ({})", p2p_server.all_peers().len());
// maintenance step first, clean up p2p server peers and mark bans
// if needed
// if needed
let disconnected = p2p_server.clean_peers();
for p in disconnected {
if p.is_banned() {

View file

@ -122,7 +122,11 @@ impl Server {
Seeding::None => {
warn!(
LOGGER,
"No seed configured, will stay solo until connected to"
"No seed(s) configured, will stay solo until connected to"
);
seed.connect_and_monitor(
evt_handle.clone(),
seed::predefined_seeds(vec![]),
);
}
Seeding::List => {
@ -132,12 +136,16 @@ impl Server {
);
}
Seeding::WebStatic => {
seed.connect_and_monitor(evt_handle.clone(), seed::web_seeds(evt_handle.clone()));
seed.connect_and_monitor(
evt_handle.clone(),
seed::web_seeds(evt_handle.clone()),
);
}
_ => {}
}
if config.seeding_type != Seeding::None {
// If we have any known seeds or peers then attempt to sync.
if config.seeding_type != Seeding::None || peer_store.all_peers().len() > 0 {
let sync = sync::Syncer::new(shared_chain.clone(), p2p_server.clone());
net_adapter.start_sync(sync);
}

View file

@ -70,27 +70,41 @@ impl Syncer {
/// Checks the local chain state, comparing it with our peers and triggers
/// syncing if required.
pub fn run(&self) -> Result<(), Error> {
debug!(LOGGER, "Starting syncer.");
info!(LOGGER, "Sync: starting sync");
// Loop for 10s waiting for some peers to potentially sync from.
let start = Instant::now();
loop {
let pc = self.p2p.peer_count();
if pc > 3 {
break;
}
if pc > 0 && (Instant::now() - start > Duration::from_secs(10)) {
if Instant::now() - start > Duration::from_secs(10) {
break;
}
thread::sleep(Duration::from_millis(200));
}
// Now check we actually have at least one peer to sync from.
// If not then end the sync cleanly.
if self.p2p.peer_count() == 0 {
info!(LOGGER, "Sync: no peers to sync with, done.");
let mut sync = self.sync.lock().unwrap();
*sync = false;
return Ok(())
}
// check if we have missing full blocks for which we already have a header
self.init_download()?;
// main syncing loop, requests more headers and bodies periodically as long
// as a peer with higher difficulty exists and we're not fully caught up
// as a peer with higher difficulty exists and we're not fully caught up
info!(LOGGER, "Starting sync loop.");
loop {
let tip = self.chain.get_header_head()?;
// TODO do something better (like trying to get more) if we lose peers
let peer = self.p2p.most_work_peer().unwrap();
debug!(
@ -104,6 +118,12 @@ impl Syncer {
let more_bodies = {
let blocks_to_download = self.blocks_to_download.lock().unwrap();
let blocks_downloading = self.blocks_downloading.lock().unwrap();
debug!(
LOGGER,
"Sync: blocks to download {}, block downloading {}",
blocks_to_download.len(),
blocks_downloading.len(),
);
blocks_to_download.len() > 0 || blocks_downloading.len() > 0
};
@ -125,7 +145,7 @@ impl Syncer {
thread::sleep(Duration::from_secs(2));
}
info!(LOGGER, "Sync done.");
info!(LOGGER, "Sync: done.");
Ok(())
}

View file

@ -44,7 +44,7 @@ use wallet::WalletConfig;
/// Just removes all results from previous runs
pub fn clean_all_output(test_name_dir: &str) {
let target_dir = format!("target/test_servers/{}", test_name_dir);
let target_dir = format!("target/{}", test_name_dir);
let result = fs::remove_dir_all(target_dir);
if let Err(e) = result {
println!("{}", e);

View file

@ -191,7 +191,6 @@ fn a_simulate_block_propagation() {
let test_name_dir = "grin-prop";
framework::clean_all_output(test_name_dir);
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
@ -221,6 +220,8 @@ fn a_simulate_block_propagation() {
port: 18000 + n,
..p2p::P2PConfig::default()
}),
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:18000".to_string()]),
..Default::default()
},
&handle,
@ -228,23 +229,12 @@ fn a_simulate_block_propagation() {
servers.push(s);
}
// everyone connects to everyone else
for n in 0..5 {
for m in 0..5 {
if m == n {
continue;
}
let addr = format!("{}:{}", "127.0.0.1", 18000 + m);
servers[n].connect_peer(addr.parse().unwrap()).unwrap();
}
}
// start mining
servers[0].start_miner(miner_config);
let original_height = servers[0].head().height;
// monitor for a change of head on a different server and check whether
// chain height has changed
// chain height has changed
evtlp.run(change(&servers[4]).and_then(|tip| {
assert!(tip.height == original_height + 1);
Ok(())
@ -283,16 +273,16 @@ fn simulate_full_sync() {
let mut servers = vec![];
for n in 0..2 {
let mut config = grin::ServerConfig {
api_http_addr: format!("127.0.0.1:{}", 19000 + n),
db_root: format!("target/{}/grin-sync-{}", test_name_dir, n),
p2p_config: Some(p2p::P2PConfig {
port: 11000 + n,
..p2p::P2PConfig::default()
}),
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:11000".to_string()]),
..Default::default()
};
if n == 1 {
config.seeding_type = grin::Seeding::Programmatic;
}
let s = grin::Server::future(config, &handle).unwrap();
servers.push(s);
}
@ -301,10 +291,6 @@ fn simulate_full_sync() {
servers[0].start_miner(miner_config);
thread::sleep(time::Duration::from_secs(5));
// connect 1 and 2
let addr = format!("{}:{}", "127.0.0.1", 11001);
servers[0].connect_peer(addr.parse().unwrap()).unwrap();
// 2 should get blocks
evtlp.run(change(&servers[1]));
}

View file

@ -57,8 +57,16 @@ impl Handshake {
self_addr: SocketAddr,
conn: TcpStream,
) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
// prepare the first part of the hanshake
// prepare the first part of the handshake
let nonce = self.next_nonce();
debug!(
LOGGER,
"handshake connect with nonce - {}, sender - {:?}, receiver - {:?}",
nonce,
self_addr,
conn.peer_addr().unwrap(),
);
let hand = Hand {
version: PROTOCOL_VERSION,
capabilities: capab,
@ -117,7 +125,14 @@ impl Handshake {
{
// check the nonce to see if we could be trying to connect to ourselves
let nonces = nonces.read().unwrap();
debug!(
LOGGER,
"checking the nonce - {}, {:?}",
&hand.nonce,
nonces,
);
if nonces.contains(&hand.nonce) {
debug!(LOGGER, "***** nonce matches! Avoiding connecting to ourselves");
return Err(Error::Serialization(ser::Error::UnexpectedData {
expected: vec![],
received: vec![],
@ -173,6 +188,8 @@ fn extract_ip(advertised: &SocketAddr, conn: &TcpStream) -> SocketAddr {
match advertised {
&SocketAddr::V4(v4sock) => {
let ip = v4sock.ip();
debug!(LOGGER, "extract_ip - {:?}, {:?}", ip, conn.peer_addr());
if ip.is_loopback() || ip.is_unspecified() {
if let Ok(addr) = conn.peer_addr() {
return SocketAddr::new(addr.ip(), advertised.port());

View file

@ -62,7 +62,7 @@ impl Peer {
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: SocketAddr,
hs: &Handshake,
hs: Arc<Handshake>,
na: Arc<NetAdapter>,
) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn)

View file

@ -16,6 +16,7 @@
//! other peers in the network.
use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
@ -63,7 +64,8 @@ impl NetAdapter for DummyAdapter {
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
peers: Arc<RwLock<Vec<Arc<Peer>>>>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<Peer>>>>,
handshake: Arc<Handshake>,
adapter: Arc<NetAdapter>,
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
}
@ -78,7 +80,8 @@ impl Server {
Server {
config: config,
capabilities: capab,
peers: Arc::new(RwLock::new(Vec::new())),
peers: Arc::new(RwLock::new(HashMap::new())),
handshake: Arc::new(Handshake::new()),
adapter: adapter,
stop: RefCell::new(None),
}
@ -91,7 +94,7 @@ impl Server {
let socket = TcpListener::bind(&addr, &h.clone()).unwrap();
warn!(LOGGER, "P2P server started on {}", addr);
let hs = Arc::new(Handshake::new());
let handshake = self.handshake.clone();
let peers = self.peers.clone();
let adapter = self.adapter.clone();
let capab = self.capabilities.clone();
@ -104,7 +107,7 @@ impl Server {
let peers = peers.clone();
// accept the peer and add it to the server map
let accept = Peer::accept(conn, capab, total_diff, &hs.clone(), adapter.clone());
let accept = Peer::accept(conn, capab, total_diff, &handshake.clone(), adapter.clone());
let added = add_to_peers(peers, adapter, accept);
// wire in a future to timeout the accept after 5 secs
@ -153,13 +156,10 @@ impl Server {
// if we're already connected to the addr, just return the peer
return Box::new(future::ok(Some(p)));
}
if self.is_self(addr) {
// asked to connect to ourselves
return Box::new(future::ok(None));
}
// cloneapalooza
let peers = self.peers.clone();
let handshake = self.handshake.clone();
let adapter = self.adapter.clone();
let capab = self.capabilities.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port);
@ -174,13 +174,13 @@ impl Server {
let total_diff = adapter.clone().total_difficulty();
// connect to the peer and add it to the server map, wiring it a timeout for
// the handhake
// the handshake
let connect = Peer::connect(
socket,
capab,
total_diff,
self_addr,
&Handshake::new(),
handshake.clone(),
adapter.clone(),
);
let added = add_to_peers(peers, adapter, connect);
@ -196,53 +196,47 @@ impl Server {
Box::new(request)
}
/// Check if the server already knows this peer (is already connected). In
/// addition we consider to know ourselves.
/// Check if the server already knows this peer (is already connected).
pub fn is_known(&self, addr: SocketAddr) -> bool {
self.get_peer(addr).is_some() || self.is_self(addr)
}
/// Whether the provided address is ourselves.
pub fn is_self(&self, addr: SocketAddr) -> bool {
addr.ip() == self.config.host && addr.port() == self.config.port
self.get_peer(addr).is_some()
}
pub fn all_peers(&self) -> Vec<Arc<Peer>> {
self.peers.read().unwrap().clone()
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
}
/// Get a peer we're connected to by address.
pub fn get_peer(&self, addr: SocketAddr) -> Option<Arc<Peer>> {
for p in self.peers.read().unwrap().deref() {
if p.info.addr == addr {
return Some((*p).clone());
}
}
None
self.peers.read().unwrap().get(&addr).map(|p| p.clone())
}
/// Have the server iterate over its peer list and prune all peers we have
/// lost connection to or have been deemed problematic. The removed peers
/// are returned.
pub fn clean_peers(&self) -> Vec<Arc<Peer>> {
let mut peers = self.peers.write().unwrap();
let mut rm = vec![];
let (keep, rm) = peers.iter().fold((vec![], vec![]), |mut acc, ref p| {
if p.clone().is_connected() {
acc.0.push((*p).clone());
} else {
acc.1.push((*p).clone());
// build a list of peers to be cleaned up
for peer in self.all_peers() {
if !peer.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer.info.addr);
rm.push(peer);
}
acc
});
*peers = keep;
}
// now clean up peer map based on the list to remove
let mut peers = self.peers.write().unwrap();
for p in rm.clone() {
peers.remove(&p.info.addr);
}
rm
}
/// Returns the peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<Peer>> {
let peers = self.peers.read().unwrap();
let peers = self.all_peers();
if peers.len() == 0 {
return None;
}
@ -257,7 +251,7 @@ impl Server {
/// Returns a random peer we're connected to.
pub fn random_peer(&self) -> Option<Arc<Peer>> {
let peers = self.peers.read().unwrap();
let peers = self.all_peers();
if peers.len() == 0 {
None
} else {
@ -270,7 +264,7 @@ impl Server {
/// may drop the broadcast request if it knows the remote peer already has
/// the block.
pub fn broadcast_block(&self, b: &core::Block) {
let peers = self.peers.write().unwrap();
let peers = self.all_peers();
for p in peers.deref() {
if p.is_connected() {
if let Err(e) = p.send_block(b) {
@ -284,7 +278,7 @@ impl Server {
/// implementation may drop the broadcast request if it knows the
/// remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let peers = self.peers.write().unwrap();
let peers = self.all_peers();
for p in peers.deref() {
if p.is_connected() {
if let Err(e) = p.send_transaction(tx) {
@ -301,7 +295,8 @@ impl Server {
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(self) {
let peers = self.peers.write().unwrap();
info!(LOGGER, "calling stop on server");
let peers = self.all_peers();
for p in peers.deref() {
p.stop();
}
@ -311,7 +306,7 @@ impl Server {
// Adds the peer built by the provided future in the peers map
fn add_to_peers<A>(
peers: Arc<RwLock<Vec<Arc<Peer>>>>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<Peer>>>>,
adapter: Arc<NetAdapter>,
peer_fut: A,
) -> Box<Future<Item = Result<(TcpStream, Arc<Peer>), ()>, Error = Error>>
@ -320,9 +315,10 @@ where
{
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
adapter.peer_connected(&peer.info);
let addr = peer.info.addr.clone();
let apeer = Arc::new(peer);
let mut peers = peers.write().unwrap();
peers.push(apeer.clone());
peers.insert(addr, apeer.clone());
Ok((conn, apeer))
});
Box::new(peer_add)

View file

@ -21,6 +21,7 @@ use core::ser::{self, Readable, Reader, Writeable, Writer};
use grin_store::{self, option_to_not_found, to_key, Error};
use msg::SockAddr;
use types::Capabilities;
use util::LOGGER;
const STORE_SUBPATH: &'static str = "peers";
@ -94,11 +95,8 @@ impl PeerStore {
}
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
// we want to ignore any peer without a well-defined ip
let ip = p.addr.ip();
if ip.is_unspecified() || ip.is_loopback() {
return Ok(());
}
debug!(LOGGER, "saving peer to store {:?}", p);
self.db.put_ser(
&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..],
p,

View file

@ -59,7 +59,7 @@ fn peer_handshake() {
p2p::UNKNOWN,
Difficulty::one(),
my_addr,
&p2p::handshake::Handshake::new(),
Arc::new(p2p::handshake::Handshake::new()),
net_adapter.clone(),
)
})