// Copyright 2018 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Storage implementation for peer data. use chrono::Utc; use num::FromPrimitive; use rand::{thread_rng, Rng}; use std::net::SocketAddr; use std::sync::Arc; use lmdb; use core::ser::{self, Readable, Reader, Writeable, Writer}; use grin_store::{self, option_to_not_found, to_key, Error}; use msg::SockAddr; use types::{Capabilities, ReasonForBan}; use util::LOGGER; const STORE_SUBPATH: &'static str = "peers"; const PEER_PREFIX: u8 = 'p' as u8; /// Types of messages enum_from_primitive! { #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub enum State { Healthy = 0, Banned = 1, Defunct = 2, } } /// Data stored for any given peer we've encountered. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerData { /// Network address of the peer. pub addr: SocketAddr, /// What capabilities the peer advertises. Unknown until a successful /// connection. pub capabilities: Capabilities, /// The peer user agent. pub user_agent: String, /// State the peer has been detected with. pub flags: State, /// The time the peer was last banned pub last_banned: i64, /// The reason for the ban pub ban_reason: ReasonForBan, } impl Writeable for PeerData { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { SockAddr(self.addr).write(writer)?; ser_multiwrite!( writer, [write_u32, self.capabilities.bits()], [write_bytes, &self.user_agent], [write_u8, self.flags as u8], [write_i64, self.last_banned], [write_i32, self.ban_reason as i32] ); Ok(()) } } impl Readable for PeerData { fn read(reader: &mut Reader) -> Result { let addr = SockAddr::read(reader)?; let (capab, ua, fl, lb, br) = ser_multiread!(reader, read_u32, read_vec, read_u8, read_i64, read_i32); let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; let last_banned = lb; let ban_reason = ReasonForBan::from_i32(br).ok_or(ser::Error::CorruptedData)?; match State::from_u8(fl) { Some(flags) => Ok(PeerData { addr: addr.0, capabilities: capabilities, user_agent: user_agent, flags: flags, last_banned: last_banned, ban_reason: ban_reason, }), None => Err(ser::Error::CorruptedData), } } } /// Storage facility for peer data. pub struct PeerStore { db: grin_store::Store, } impl PeerStore { /// Instantiates a new peer store under the provided root path. pub fn new(db_env: Arc) -> Result { let db = grin_store::Store::open(db_env, STORE_SUBPATH); Ok(PeerStore { db: db }) } pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { debug!(LOGGER, "save_peer: {:?} marked {:?}", p.addr, p.flags); let batch = self.db.batch()?; batch.put_ser(&peer_key(p.addr)[..], p)?; batch.commit() } pub fn get_peer(&self, peer_addr: SocketAddr) -> Result { option_to_not_found( self.db.get_ser(&peer_key(peer_addr)[..]), &format!("Peer at address: {}", peer_addr), ) } pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { self.db.exists(&peer_key(peer_addr)[..]) } /// TODO - allow below added to avoid github issue reports #[allow(dead_code)] pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { let batch = self.db.batch()?; batch.delete(&peer_key(peer_addr)[..])?; batch.commit() } pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { let mut peers = self .db .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())) .unwrap() .filter(|p| p.flags == state && p.capabilities.contains(cap)) .collect::>(); thread_rng().shuffle(&mut peers[..]); peers.iter().take(count).cloned().collect() } /// List all known peers /// Used for /v1/peers/all api endpoint pub fn all_peers(&self) -> Vec { let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes()); self.db.iter::(&key).unwrap().collect::>() } /// Convenience method to load a peer data, update its status and save it /// back. If new state is Banned its last banned time will be updated too. pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> { let batch = self.db.batch()?; let mut peer = option_to_not_found( batch.get_ser::(&peer_key(peer_addr)[..]), &format!("Peer at address: {}", peer_addr), )?; peer.flags = new_state; if new_state == State::Banned { peer.last_banned = Utc::now().timestamp(); } batch.put_ser(&peer_key(peer.addr)[..], &peer)?; batch.commit() } } fn peer_key(peer_addr: SocketAddr) -> Vec { to_key( PEER_PREFIX, &mut format!("{}:{}", peer_addr.ip(), peer_addr.port()).into_bytes(), ) }