diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 9de05704c..4eb5e6bd0 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -27,6 +27,7 @@ use futures::sync::mpsc; use hyper; use tokio_core::reactor; use tokio_timer::Timer; +use time::now_utc; use p2p; use util::LOGGER; @@ -81,8 +82,12 @@ impl Seeder { let peers = self.peers.clone(); let capabilities = self.capabilities.clone(); + // Unban peer after 3 hours + let ban_windows: i64 = 10800; + // now spawn a new future to regularly check if we need to acquire more peers - // and if so, gets them from db + // and if so, gets them from db and unban the banned peers after the ban is + // expired let mon_loop = Timer::default() .interval(time::Duration::from_secs(30)) .for_each(move |_| { @@ -99,9 +104,23 @@ impl Seeder { let mut banned_count = 0; let mut defunct_count = 0; for x in peers.all_peers() { - if x.flags == p2p::State::Healthy { healthy_count += 1 } - else if x.flags == p2p::State::Banned { banned_count += 1 } - else if x.flags == p2p::State::Defunct { defunct_count += 1 }; + if x.flags == p2p::State::Healthy { + healthy_count += 1 + } else if x.flags == p2p::State::Banned { + let interval = now_utc().to_timespec().sec - x.last_banned; + if interval >= ban_windows { + // Unban peer + peers.unban_peer(&x.addr); + debug!( + LOGGER, + "monitor_peers: unbanned {} after {} seconds", x.addr, interval + ); + } else { + banned_count += 1; + } + } else if x.flags == p2p::State::Defunct { + defunct_count += 1 + }; } debug!( @@ -126,31 +145,19 @@ impl Seeder { if let Ok(p) = p.try_read() { debug!( LOGGER, - "monitor_peers: asking {} for more peers", - p.info.addr, + "monitor_peers: asking {} for more peers", p.info.addr, ); let _ = p.send_peer_request(capabilities); } else { - warn!( - LOGGER, - "monitor_peers: failed to get read lock on peer", - ); + warn!(LOGGER, "monitor_peers: failed to get read lock on peer",); } } // find some peers from our db // and queue them up for a connection attempt - let peers = peers.find_peers( - p2p::State::Healthy, - p2p::UNKNOWN, - 100, - ); + let peers = peers.find_peers(p2p::State::Healthy, p2p::UNKNOWN, 100); for p in peers { - debug!( - LOGGER, - "monitor_peers: queue to soon try {}", - p.addr, - ); + debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr,); tx.unbounded_send(p.addr).unwrap(); } } @@ -168,20 +175,17 @@ impl Seeder { tx: mpsc::UnboundedSender, seed_list: Box, Error = String>>, ) -> Box> { - // a thread pool is required so we don't block the event loop with a // db query let thread_pool = cpupool::Builder::new() - .pool_size(1).name_prefix("seed").create(); + .pool_size(1) + .name_prefix("seed") + .create(); let peers = self.peers.clone(); let seeder = thread_pool .spawn_fn(move || { // check if we have some peers in db - let peers = peers.find_peers( - p2p::State::Healthy, - p2p::FULL_HIST, - 100, - ); + let peers = peers.find_peers(p2p::State::Healthy, p2p::FULL_HIST, 100); Ok(peers) }) .and_then(|peers| { @@ -224,7 +228,13 @@ impl Seeder { debug!(LOGGER, "New peer address to connect to: {}.", peer_addr); let inner_h = h.clone(); if peers.peer_count() < PEER_MAX_COUNT { - h.spawn(connect_and_req(capab, p2p_server.clone(), peers.clone(), inner_h, peer_addr)) + h.spawn(connect_and_req( + capab, + p2p_server.clone(), + peers.clone(), + inner_h, + peer_addr, + )) } Box::new(future::ok(())) }); @@ -276,12 +286,10 @@ pub fn predefined_seeds( addrs_str: Vec, ) -> Box, Error = String>> { let seeds = future::ok(()).and_then(move |_| { - Ok( - addrs_str - .iter() - .map(|s| s.parse().unwrap()) - .collect::>(), - ) + Ok(addrs_str + .iter() + .map(|s| s.parse().unwrap()) + .collect::>()) }); Box::new(seeds) } @@ -302,14 +310,17 @@ fn connect_and_req( if let Ok(p) = p.try_read() { let _ = p.send_peer_request(capab); } - }, + } Ok(None) => { - debug!(LOGGER, "connect_and_req: ok but none inner (what does this mean?), {}", addr); - }, + debug!( + LOGGER, + "connect_and_req: ok but none inner (what does this mean?), {}", addr + ); + } Err(e) => { debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e); let _ = peers.update_state(addr, p2p::State::Defunct); - }, + } } Ok(()) }); diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index f064dfcca..529a4807e 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -22,9 +22,10 @@ use core::core; use core::core::hash::Hash; use core::core::target::Difficulty; use util::LOGGER; +use time; use peer::Peer; -use store::{PeerStore, PeerData, State}; +use store::{PeerData, PeerStore, State}; use types::*; #[derive(Clone)] @@ -55,6 +56,7 @@ impl Peers { capabilities: p.info.capabilities, user_agent: p.info.user_agent.clone(), flags: State::Healthy, + last_banned: 0, }; if let Err(e) = self.save_peer(&peer_data) { error!(LOGGER, "Could not save connected peer: {:?}", e); @@ -75,7 +77,12 @@ impl Peers { /// Get vec of peers we are currently connected to. pub fn connected_peers(&self) -> Vec>> { - let mut res = self.peers.read().unwrap().values().cloned().collect::>(); + let mut res = self.peers + .read() + .unwrap() + .values() + .cloned() + .collect::>(); thread_rng().shuffle(&mut res); res } @@ -90,8 +97,8 @@ impl Peers { self.connected_peers().len() as u32 } - // Return vec of connected peers that currently advertise more work (total_difficulty) - // than we do. + // Return vec of connected peers that currently advertise more work + // (total_difficulty) than we do. pub fn more_work_peers(&self) -> Vec>> { let peers = self.connected_peers(); if peers.len() == 0 { @@ -102,13 +109,9 @@ impl Peers { let mut max_peers = peers .iter() - .filter(|x| { - match x.try_read() { - Ok(peer) => { - peer.info.total_difficulty > total_difficulty - }, - Err(_) => false, - } + .filter(|x| match x.try_read() { + Ok(peer) => peer.info.total_difficulty > total_difficulty, + Err(_) => false, }) .cloned() .collect::>(); @@ -121,7 +124,7 @@ impl Peers { pub fn more_work_peer(&self) -> Option>> { match self.more_work_peers().first() { Some(x) => Some(x.clone()), - None => None + None => None, } } @@ -135,24 +138,18 @@ impl Peers { let max_total_difficulty = peers .iter() - .map(|x| { - match x.try_read() { - Ok(peer) => peer.info.total_difficulty.clone(), - Err(_) => Difficulty::zero(), - } + .map(|x| match x.try_read() { + Ok(peer) => peer.info.total_difficulty.clone(), + Err(_) => Difficulty::zero(), }) .max() .unwrap(); let mut max_peers = peers .iter() - .filter(|x| { - match x.try_read() { - Ok(peer) => { - peer.info.total_difficulty == max_total_difficulty - }, - Err(_) => false, - } + .filter(|x| match x.try_read() { + Ok(peer) => peer.info.total_difficulty == max_total_difficulty, + Err(_) => false, }) .cloned() .collect::>(); @@ -166,7 +163,7 @@ impl Peers { pub fn most_work_peer(&self) -> Option>> { match self.most_work_peers().first() { Some(x) => Some(x.clone()), - None => None + None => None, } } @@ -185,6 +182,15 @@ impl Peers { error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); } + if let Err(e) = + self.update_last_banned(peer_addr.clone(), time::now_utc().to_timespec().sec) + { + error!( + LOGGER, + "Couldn't update last_banned time {}: {:?}", peer_addr, e + ); + } + if let Some(peer) = self.get_connected_peer(peer_addr) { debug!(LOGGER, "Banning peer {}", peer_addr); // setting peer status will get it removed at the next clean_peer @@ -205,8 +211,8 @@ impl Peers { } else { error!(LOGGER, "Couldn't unban {}: peer is not banned", peer_addr) } - }, - Err(e) => error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e) + } + Err(e) => error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e), }; } @@ -295,7 +301,16 @@ impl Peers { /// Updates the state of a peer in store pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { - self.store.update_state(peer_addr, new_state).map_err(From::from) + self.store + .update_state(peer_addr, new_state) + .map_err(From::from) + } + + /// Updates the last banned time of a peer in store + pub fn update_last_banned(&self, peer_addr: SocketAddr, last_banned: i64) -> Result<(), Error> { + self.store + .update_last_banned(peer_addr, last_banned) + .map_err(From::from) } /// Iterate over the peer list and prune all peers we have @@ -337,20 +352,21 @@ impl Peers { // map peers to addrs in a block to bound how long we keep the read lock for let addrs = { - self.connected_peers().iter().map(|x| { - let p = x.read().unwrap(); - p.info.addr.clone() - }).collect::>() + self.connected_peers() + .iter() + .map(|x| { + let p = x.read().unwrap(); + p.info.addr.clone() + }) + .collect::>() }; // now remove them taking a short-lived write lock each time // maybe better to take write lock once and remove them all? - for x in addrs - .iter() - .take(excess_count) { - let mut peers = self.peers.write().unwrap(); - peers.remove(x); - } + for x in addrs.iter().take(excess_count) { + let mut peers = self.peers.write().unwrap(); + peers.remove(x); + } } pub fn stop(self) { @@ -382,7 +398,7 @@ impl ChainAdapter for Peers { true } } - fn headers_received(&self, headers: Vec, peer_addr:SocketAddr) { + fn headers_received(&self, headers: Vec, peer_addr: SocketAddr) { self.adapter.headers_received(headers, peer_addr) } fn locate_headers(&self, hs: Vec) -> Vec { @@ -416,6 +432,7 @@ impl NetAdapter for Peers { capabilities: UNKNOWN, user_agent: "".to_string(), flags: State::Healthy, + last_banned: 0, }; if let Err(e) = self.save_peer(&peer) { error!(LOGGER, "Could not save received peer address: {:?}", e); @@ -427,7 +444,7 @@ impl NetAdapter for Peers { debug!( LOGGER, "peer total_diff @ height (ping/pong): {}: {} @ {} \ - vs us: {} @ {}", + vs us: {} @ {}", addr, diff, height, diff --git a/p2p/src/store.rs b/p2p/src/store.rs index b31dc2315..4958f9551 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -50,6 +50,8 @@ pub struct PeerData { pub user_agent: String, /// State the peer has been detected with. pub flags: State, + /// The time the peer was last banned + pub last_banned: i64, } impl Writeable for PeerData { @@ -59,7 +61,8 @@ impl Writeable for PeerData { writer, [write_u32, self.capabilities.bits()], [write_bytes, &self.user_agent], - [write_u8, self.flags as u8] + [write_u8, self.flags as u8], + [write_i64, self.last_banned] ); Ok(()) } @@ -68,15 +71,17 @@ impl Writeable for PeerData { impl Readable for PeerData { fn read(reader: &mut Reader) -> Result { let addr = SockAddr::read(reader)?; - let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); + let (capab, ua, fl, lb) = ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64); let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; + let last_banned = lb; match State::from_u8(fl) { Some(flags) => Ok(PeerData { addr: addr.0, capabilities: capabilities, user_agent: user_agent, flags: flags, + last_banned: last_banned, }), None => Err(ser::Error::CorruptedData), } @@ -139,6 +144,14 @@ impl PeerStore { peer.flags = new_state; self.save_peer(&peer) } + + /// Convenience method to load a peer data, update its last banned time and + /// save it back. + pub fn update_last_banned(&self, peer_addr: SocketAddr, last_banned: i64) -> Result<(), Error> { + let mut peer = self.get_peer(peer_addr)?; + peer.last_banned = last_banned; + self.save_peer(&peer) + } } fn peer_key(peer_addr: SocketAddr) -> Vec {