Fix #439 Temporary peer banning (#631)

* Fix #439 Temporary peer banning
This commit is contained in:
Quentin Le Sceller 2018-01-18 13:39:56 -05:00 committed by Ignotus Peverell
parent dae90543c2
commit 911aadf8b4
3 changed files with 121 additions and 80 deletions

View file

@ -27,6 +27,7 @@ use futures::sync::mpsc;
use hyper;
use tokio_core::reactor;
use tokio_timer::Timer;
use time::now_utc;
use p2p;
use util::LOGGER;
@ -81,8 +82,12 @@ impl Seeder {
let peers = self.peers.clone();
let capabilities = self.capabilities.clone();
// Unban peer after 3 hours
let ban_windows: i64 = 10800;
// now spawn a new future to regularly check if we need to acquire more peers
// and if so, gets them from db
// and if so, gets them from db and unban the banned peers after the ban is
// expired
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(30))
.for_each(move |_| {
@ -99,9 +104,23 @@ impl Seeder {
let mut banned_count = 0;
let mut defunct_count = 0;
for x in peers.all_peers() {
if x.flags == p2p::State::Healthy { healthy_count += 1 }
else if x.flags == p2p::State::Banned { banned_count += 1 }
else if x.flags == p2p::State::Defunct { defunct_count += 1 };
if x.flags == p2p::State::Healthy {
healthy_count += 1
} else if x.flags == p2p::State::Banned {
let interval = now_utc().to_timespec().sec - x.last_banned;
if interval >= ban_windows {
// Unban peer
peers.unban_peer(&x.addr);
debug!(
LOGGER,
"monitor_peers: unbanned {} after {} seconds", x.addr, interval
);
} else {
banned_count += 1;
}
} else if x.flags == p2p::State::Defunct {
defunct_count += 1
};
}
debug!(
@ -126,31 +145,19 @@ impl Seeder {
if let Ok(p) = p.try_read() {
debug!(
LOGGER,
"monitor_peers: asking {} for more peers",
p.info.addr,
"monitor_peers: asking {} for more peers", p.info.addr,
);
let _ = p.send_peer_request(capabilities);
} else {
warn!(
LOGGER,
"monitor_peers: failed to get read lock on peer",
);
warn!(LOGGER, "monitor_peers: failed to get read lock on peer",);
}
}
// find some peers from our db
// and queue them up for a connection attempt
let peers = peers.find_peers(
p2p::State::Healthy,
p2p::UNKNOWN,
100,
);
let peers = peers.find_peers(p2p::State::Healthy, p2p::UNKNOWN, 100);
for p in peers {
debug!(
LOGGER,
"monitor_peers: queue to soon try {}",
p.addr,
);
debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr,);
tx.unbounded_send(p.addr).unwrap();
}
}
@ -168,20 +175,17 @@ impl Seeder {
tx: mpsc::UnboundedSender<SocketAddr>,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
) -> Box<Future<Item = (), Error = String>> {
// a thread pool is required so we don't block the event loop with a
// db query
let thread_pool = cpupool::Builder::new()
.pool_size(1).name_prefix("seed").create();
.pool_size(1)
.name_prefix("seed")
.create();
let peers = self.peers.clone();
let seeder = thread_pool
.spawn_fn(move || {
// check if we have some peers in db
let peers = peers.find_peers(
p2p::State::Healthy,
p2p::FULL_HIST,
100,
);
let peers = peers.find_peers(p2p::State::Healthy, p2p::FULL_HIST, 100);
Ok(peers)
})
.and_then(|peers| {
@ -224,7 +228,13 @@ impl Seeder {
debug!(LOGGER, "New peer address to connect to: {}.", peer_addr);
let inner_h = h.clone();
if peers.peer_count() < PEER_MAX_COUNT {
h.spawn(connect_and_req(capab, p2p_server.clone(), peers.clone(), inner_h, peer_addr))
h.spawn(connect_and_req(
capab,
p2p_server.clone(),
peers.clone(),
inner_h,
peer_addr,
))
}
Box::new(future::ok(()))
});
@ -276,12 +286,10 @@ pub fn predefined_seeds(
addrs_str: Vec<String>,
) -> Box<Future<Item = Vec<SocketAddr>, Error = String>> {
let seeds = future::ok(()).and_then(move |_| {
Ok(
addrs_str
.iter()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>(),
)
Ok(addrs_str
.iter()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>())
});
Box::new(seeds)
}
@ -302,14 +310,17 @@ fn connect_and_req(
if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab);
}
},
}
Ok(None) => {
debug!(LOGGER, "connect_and_req: ok but none inner (what does this mean?), {}", addr);
},
debug!(
LOGGER,
"connect_and_req: ok but none inner (what does this mean?), {}", addr
);
}
Err(e) => {
debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e);
let _ = peers.update_state(addr, p2p::State::Defunct);
},
}
}
Ok(())
});

View file

@ -22,9 +22,10 @@ use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use util::LOGGER;
use time;
use peer::Peer;
use store::{PeerStore, PeerData, State};
use store::{PeerData, PeerStore, State};
use types::*;
#[derive(Clone)]
@ -55,6 +56,7 @@ impl Peers {
capabilities: p.info.capabilities,
user_agent: p.info.user_agent.clone(),
flags: State::Healthy,
last_banned: 0,
};
if let Err(e) = self.save_peer(&peer_data) {
error!(LOGGER, "Could not save connected peer: {:?}", e);
@ -75,7 +77,12 @@ impl Peers {
/// Get vec of peers we are currently connected to.
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let mut res = self.peers.read().unwrap().values().cloned().collect::<Vec<_>>();
let mut res = self.peers
.read()
.unwrap()
.values()
.cloned()
.collect::<Vec<_>>();
thread_rng().shuffle(&mut res);
res
}
@ -90,8 +97,8 @@ impl Peers {
self.connected_peers().len() as u32
}
// Return vec of connected peers that currently advertise more work (total_difficulty)
// than we do.
// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
let peers = self.connected_peers();
if peers.len() == 0 {
@ -102,13 +109,9 @@ impl Peers {
let mut max_peers = peers
.iter()
.filter(|x| {
match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty > total_difficulty
},
Err(_) => false,
}
.filter(|x| match x.try_read() {
Ok(peer) => peer.info.total_difficulty > total_difficulty,
Err(_) => false,
})
.cloned()
.collect::<Vec<_>>();
@ -121,7 +124,7 @@ impl Peers {
pub fn more_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.more_work_peers().first() {
Some(x) => Some(x.clone()),
None => None
None => None,
}
}
@ -135,24 +138,18 @@ impl Peers {
let max_total_difficulty = peers
.iter()
.map(|x| {
match x.try_read() {
Ok(peer) => peer.info.total_difficulty.clone(),
Err(_) => Difficulty::zero(),
}
.map(|x| match x.try_read() {
Ok(peer) => peer.info.total_difficulty.clone(),
Err(_) => Difficulty::zero(),
})
.max()
.unwrap();
let mut max_peers = peers
.iter()
.filter(|x| {
match x.try_read() {
Ok(peer) => {
peer.info.total_difficulty == max_total_difficulty
},
Err(_) => false,
}
.filter(|x| match x.try_read() {
Ok(peer) => peer.info.total_difficulty == max_total_difficulty,
Err(_) => false,
})
.cloned()
.collect::<Vec<_>>();
@ -166,7 +163,7 @@ impl Peers {
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
match self.most_work_peers().first() {
Some(x) => Some(x.clone()),
None => None
None => None,
}
}
@ -185,6 +182,15 @@ impl Peers {
error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e);
}
if let Err(e) =
self.update_last_banned(peer_addr.clone(), time::now_utc().to_timespec().sec)
{
error!(
LOGGER,
"Couldn't update last_banned time {}: {:?}", peer_addr, e
);
}
if let Some(peer) = self.get_connected_peer(peer_addr) {
debug!(LOGGER, "Banning peer {}", peer_addr);
// setting peer status will get it removed at the next clean_peer
@ -205,8 +211,8 @@ impl Peers {
} else {
error!(LOGGER, "Couldn't unban {}: peer is not banned", peer_addr)
}
},
Err(e) => error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e)
}
Err(e) => error!(LOGGER, "Couldn't unban {}: {:?}", peer_addr, e),
};
}
@ -295,7 +301,16 @@ impl Peers {
/// Updates the state of a peer in store
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> {
self.store.update_state(peer_addr, new_state).map_err(From::from)
self.store
.update_state(peer_addr, new_state)
.map_err(From::from)
}
/// Updates the last banned time of a peer in store
pub fn update_last_banned(&self, peer_addr: SocketAddr, last_banned: i64) -> Result<(), Error> {
self.store
.update_last_banned(peer_addr, last_banned)
.map_err(From::from)
}
/// Iterate over the peer list and prune all peers we have
@ -337,20 +352,21 @@ impl Peers {
// map peers to addrs in a block to bound how long we keep the read lock for
let addrs = {
self.connected_peers().iter().map(|x| {
let p = x.read().unwrap();
p.info.addr.clone()
}).collect::<Vec<_>>()
self.connected_peers()
.iter()
.map(|x| {
let p = x.read().unwrap();
p.info.addr.clone()
})
.collect::<Vec<_>>()
};
// now remove them taking a short-lived write lock each time
// maybe better to take write lock once and remove them all?
for x in addrs
.iter()
.take(excess_count) {
let mut peers = self.peers.write().unwrap();
peers.remove(x);
}
for x in addrs.iter().take(excess_count) {
let mut peers = self.peers.write().unwrap();
peers.remove(x);
}
}
pub fn stop(self) {
@ -382,7 +398,7 @@ impl ChainAdapter for Peers {
true
}
}
fn headers_received(&self, headers: Vec<core::BlockHeader>, peer_addr:SocketAddr) {
fn headers_received(&self, headers: Vec<core::BlockHeader>, peer_addr: SocketAddr) {
self.adapter.headers_received(headers, peer_addr)
}
fn locate_headers(&self, hs: Vec<Hash>) -> Vec<core::BlockHeader> {
@ -416,6 +432,7 @@ impl NetAdapter for Peers {
capabilities: UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
last_banned: 0,
};
if let Err(e) = self.save_peer(&peer) {
error!(LOGGER, "Could not save received peer address: {:?}", e);
@ -427,7 +444,7 @@ impl NetAdapter for Peers {
debug!(
LOGGER,
"peer total_diff @ height (ping/pong): {}: {} @ {} \
vs us: {} @ {}",
vs us: {} @ {}",
addr,
diff,
height,

View file

@ -50,6 +50,8 @@ pub struct PeerData {
pub user_agent: String,
/// State the peer has been detected with.
pub flags: State,
/// The time the peer was last banned
pub last_banned: i64,
}
impl Writeable for PeerData {
@ -59,7 +61,8 @@ impl Writeable for PeerData {
writer,
[write_u32, self.capabilities.bits()],
[write_bytes, &self.user_agent],
[write_u8, self.flags as u8]
[write_u8, self.flags as u8],
[write_i64, self.last_banned]
);
Ok(())
}
@ -68,15 +71,17 @@ impl Writeable for PeerData {
impl Readable for PeerData {
fn read(reader: &mut Reader) -> Result<PeerData, ser::Error> {
let addr = SockAddr::read(reader)?;
let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8);
let (capab, ua, fl, lb) = ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64);
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?;
let last_banned = lb;
match State::from_u8(fl) {
Some(flags) => Ok(PeerData {
addr: addr.0,
capabilities: capabilities,
user_agent: user_agent,
flags: flags,
last_banned: last_banned,
}),
None => Err(ser::Error::CorruptedData),
}
@ -139,6 +144,14 @@ impl PeerStore {
peer.flags = new_state;
self.save_peer(&peer)
}
/// Convenience method to load a peer data, update its last banned time and
/// save it back.
pub fn update_last_banned(&self, peer_addr: SocketAddr, last_banned: i64) -> Result<(), Error> {
let mut peer = self.get_peer(peer_addr)?;
peer.last_banned = last_banned;
self.save_peer(&peer)
}
}
fn peer_key(peer_addr: SocketAddr) -> Vec<u8> {