From 7b9351864a747f5dcc170fd64181fb6d81c1da58 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Mon, 27 Nov 2017 23:44:33 -0500 Subject: [PATCH] Banning of misbehaving peer, applied to bad blocks Added support for peer banning on the p2p server. The peer status is changed and the peer is disconnected. A banned peer won't be able to reconnect as well. Tracking of chain errors due to a block that's intrinsically bad and banning of the peer that sent it. If we're syncing, resetting the header chain to the same as the main chain to force backtracking. --- chain/src/chain.rs | 11 ++++++ chain/src/types.rs | 17 ++++++++++ grin/src/adapters.rs | 14 +++++++- grin/src/seed.rs | 17 ++-------- p2p/src/peer.rs | 10 ++++-- p2p/src/protocol.rs | 2 +- p2p/src/server.rs | 81 +++++++++++++++++++++++++++++++++----------- p2p/src/store.rs | 15 +++----- p2p/src/types.rs | 3 +- 9 files changed, 120 insertions(+), 50 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 9ec706192..d25138826 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -287,6 +287,17 @@ impl Chain { sumtrees.roots() } + /// Reset the header head to the same as the main head. When sync is running, + /// the header head will go ahead to try to download as many as possible. + /// However if a block, when fully received, is found invalid, the header + /// head need to backtrack to the last known valid position. + pub fn reset_header_head(&self) -> Result<(), Error> { + let head = self.head.lock().unwrap(); + debug!(LOGGER, "Reset header head to {} at {}", + head.last_block_h, head.height); + self.store.save_header_head(&head).map_err(From::from) + } + /// returns the last n nodes inserted into the utxo sum tree /// returns sum tree hash plus output itself (as the sum is contained /// in the output anyhow) diff --git a/chain/src/types.rs b/chain/src/types.rs index ce485a372..9521c633e 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -100,6 +100,23 @@ impl From for Error { } } +impl Error { + /// Whether the error is due to a block that was intrinsically wrong + pub fn is_bad_block(&self) -> bool { + // shorter to match on all the "not the block's fault" errors + match *self { + Error::Unfit(_) | + Error::Orphan | + Error::StoreErr(_, _) | + Error::SerErr(_) | + Error::SumTreeErr(_)| + Error::GenesisBlockRequired | + Error::Other(_) => false, + _ => true, + } + } +} + /// The tip of a fork. A handle to the fork ancestry from its leaf in the /// blockchain tree. References the max height and the latest and previous /// blocks diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 4c99084bb..adc7c8744 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -61,7 +61,7 @@ impl NetAdapter for NetToChainAdapter { } } - fn block_received(&self, b: core::Block) { + fn block_received(&self, b: core::Block, addr: SocketAddr) { let bhash = b.hash(); debug!( LOGGER, @@ -75,6 +75,18 @@ impl NetAdapter for NetToChainAdapter { if let &Err(ref e) = &res { debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e); + + // if the peer sent us a block that's intrinsically bad, they're either + // mistaken or manevolent, both of which require a ban + if e.is_bad_block() { + self.p2p_server.borrow().ban_peer(&addr); + + // and if we're currently syncing, our header chain is now wrong, it + // needs to be reset + if self.is_syncing() { + self.chain.reset_header_head(); + } + } } } diff --git a/grin/src/seed.rs b/grin/src/seed.rs index 806625d7f..ec9b1fcba 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -93,21 +93,8 @@ impl Seeder { p2p_server.all_peers().len(), ); - // maintenance step first, clean up p2p server peers and mark bans - // if needed - let disconnected = p2p_server.clean_peers(); - for p in disconnected { - let p = p.read().unwrap(); - if p.is_banned() { - debug!(LOGGER, "Marking peer {} as banned.", p.info.addr); - let update_result = - p2p_server.update_state(p.info.addr, p2p::State::Banned); - match update_result { - Ok(()) => {} - Err(_) => {} - } - } - } + // maintenance step first, clean up p2p server peers + let _ = p2p_server.clean_peers(); // we don't have enough peers, getting more from db if p2p_server.peer_count() < PEER_PREFERRED_COUNT { diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index c54fac1fa..9ad4b062c 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -129,6 +129,12 @@ impl Peer { *state == State::Banned } + /// Set this peer status to banned + pub fn set_banned(&self) { + let mut state = self.state.write().unwrap(); + *state = State::Banned; + } + /// Bytes sent and received by this peer to the remote peer. pub fn transmitted_bytes(&self) -> (u64, u64) { self.proto.transmitted_bytes() @@ -224,9 +230,9 @@ impl NetAdapter for TrackingAdapter { self.adapter.transaction_received(tx) } - fn block_received(&self, b: core::Block) { + fn block_received(&self, b: core::Block, addr: SocketAddr) { self.push(b.hash()); - self.adapter.block_received(b) + self.adapter.block_received(b, addr) } fn headers_received(&self, bh: Vec, addr: SocketAddr) { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index f6f192891..ba67db6a9 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -185,7 +185,7 @@ fn handle_payload( Type::Block => { let b = ser::deserialize::(&mut &buf[..])?; let bh = b.hash(); - adapter.block_received(b); + adapter.block_received(b, addr); Ok(Some(bh)) } Type::GetHeaders => { diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 9ffcaa1a4..d7db193bc 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -17,7 +17,7 @@ use std::cell::RefCell; use std::collections::HashMap; -use std::net::SocketAddr; +use std::net::{SocketAddr, Shutdown}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -45,8 +45,8 @@ impl NetAdapter for DummyAdapter { Difficulty::one() } fn transaction_received(&self, _: core::Transaction) {} - fn block_received(&self, _: core::Block) {} - fn headers_received(&self, _: Vec, _: SocketAddr) {} + fn block_received(&self, _: core::Block, _: SocketAddr) {} + fn headers_received(&self, _: Vec, _:SocketAddr) {} fn locate_headers(&self, _: Vec) -> Vec { vec![] } @@ -108,30 +108,56 @@ impl Server { let peers = self.peers.clone(); let adapter = self.adapter.clone(); let capab = self.capabilities.clone(); + let store = self.store.clone(); // main peer acceptance future handling handshake let hp = h.clone(); let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| { + + // aaaand.. reclone for the internal closures + let adapter = adapter.clone(); + let store = store.clone(); let peers = peers.clone(); - let total_diff = adapter.total_difficulty(); + let handshake = handshake.clone(); + let hp = hp.clone(); - // accept the peer and add it to the server map - let accept = Peer::accept( - conn, - capab, - total_diff, - &handshake.clone(), - adapter.clone(), - ); - let added = add_to_peers(peers, adapter.clone(), accept); + future::ok(conn).and_then(move |conn| { + // Refuse banned peers connection + if let Ok(peer_addr) = conn.peer_addr() { + if let Ok(peer_data) = store.get_peer(peer_addr) { + if peer_data.flags == State::Banned { + debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr); + if let Err(e) = conn.shutdown(Shutdown::Both) { + debug!(LOGGER, "Error shutting down conn: {:?}", e); + } + return Err(Error::Banned) + } + } + } + Ok(conn) + }).and_then(move |conn| { - // wire in a future to timeout the accept after 5 secs - let timed_peer = with_timeout(Box::new(added), &hp); + let peers = peers.clone(); + let total_diff = adapter.total_difficulty(); - // run the main peer protocol - timed_peer.and_then(move |(conn, peer)| { - let peer = peer.read().unwrap(); - peer.run(conn) + // accept the peer and add it to the server map + let accept = Peer::accept( + conn, + capab, + total_diff, + &handshake.clone(), + adapter.clone(), + ); + let added = add_to_peers(peers, adapter.clone(), accept); + + // wire in a future to timeout the accept after 5 secs + let timed_peer = with_timeout(Box::new(added), &hp); + + // run the main peer protocol + timed_peer.and_then(move |(conn, peer)| { + let peer = peer.read().unwrap(); + peer.run(conn) + }) }) }); @@ -254,7 +280,7 @@ impl Server { // build a list of peers to be cleaned up for peer in self.connected_peers() { let peer_inner = peer.read().unwrap(); - if !peer_inner.is_connected() { + if peer_inner.is_banned() || !peer_inner.is_connected() { debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr); rm.push(peer.clone()); } @@ -350,6 +376,21 @@ impl Server { self.connected_peers().len() as u32 } + /// Bans a peer, disconnecting it if we're currently connected + pub fn ban_peer(&self, peer_addr: &SocketAddr) { + if let Err(e) = self.update_state(peer_addr.clone(), State::Banned) { + error!(LOGGER, "Couldn't ban {}: {:?}", peer_addr, e); + } + + if let Some(peer) = self.get_peer(peer_addr) { + debug!(LOGGER, "Banning peer {}", peer_addr); + // setting peer status will get it removed at the next clean_peer + let peer = peer.write().unwrap(); + peer.set_banned(); + peer.stop(); + } + } + /// Stops the server. Disconnect from all peers at the same time. pub fn stop(self) { info!(LOGGER, "calling stop on server"); diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 3dee7e421..cf03e391b 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -98,24 +98,19 @@ impl PeerStore { pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { debug!(LOGGER, "saving peer to store {:?}", p); - self.db.put_ser( - &to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], - p, - ) + self.db.put_ser(&peer_key(p.addr)[..], p) } - fn get_peer(&self, peer_addr: SocketAddr) -> Result { + pub fn get_peer(&self, peer_addr: SocketAddr) -> Result { option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..])) } pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { - self.db - .exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + self.db.exists(&peer_key(peer_addr)[..]) } pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { - self.db - .delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + self.db.delete(&peer_key(peer_addr)[..]) } pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { @@ -148,5 +143,5 @@ impl PeerStore { } fn peer_key(peer_addr: SocketAddr) -> Vec { - to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes()) + to_key(PEER_PREFIX, &mut format!("{}", peer_addr.ip()).into_bytes()) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 78d72c7b9..1cf6a0b59 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -41,6 +41,7 @@ pub const MAX_PEER_ADDRS: u32 = 256; pub enum Error { Serialization(ser::Error), Connection(io::Error), + Banned, ConnectionClose, Timeout, Store(grin_store::Error), @@ -169,7 +170,7 @@ pub trait NetAdapter: Sync + Send { fn transaction_received(&self, tx: core::Transaction); /// A block has been received from one of our peers - fn block_received(&self, b: core::Block); + fn block_received(&self, b: core::Block, addr: SocketAddr); /// A set of block header has been received, typically in response to a /// block