From 85edf573669601395c3a0cd1c62c084bec83c4bd Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Thu, 9 Feb 2017 20:15:22 -0800 Subject: [PATCH] Iterator for Readable impls stored in DB. --- p2p/src/store.rs | 108 +++++++++++++++++++++++++++++++++++++++++++++++ store/src/lib.rs | 41 ++++++++++++++++-- 2 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 p2p/src/store.rs diff --git a/p2p/src/store.rs b/p2p/src/store.rs new file mode 100644 index 000000000..9fbfe7df5 --- /dev/null +++ b/p2p/src/store.rs @@ -0,0 +1,108 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Storage implementation for peer data. + +use std::net::SocketAddr; +use num::FromPrimitive; + +use core::ser::{self, Readable, Writeable, Reader, Writer}; +use grin_store::{self, Error, to_key}; +use msg::SockAddr; +use types::Capabilities; + +const STORE_SUBPATH: &'static str = "peers"; + +const PEER_PREFIX: u8 = 'p' as u8; + +/// Types of messages +enum_from_primitive! { + #[derive(Debug, Clone, Copy, PartialEq)] + pub enum State { + Healthy, + Banned, + Dead, + } +} + +pub struct Peer { + pub addr: SocketAddr, + pub capabilities: Capabilities, + pub user_agent: String, + pub flags: State +} + +impl Writeable for Peer { + fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { + SockAddr(self.addr).write(writer)?; + ser_multiwrite!(writer, + [write_u32, self.capabilities.bits()], + [write_bytes, &self.user_agent], + [write_u8, self.flags as u8]); + Ok(()) + } +} + +impl Readable for Peer { + fn read(reader: &mut Reader) -> Result { + let addr = SockAddr::read(reader)?; + let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); + let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; + let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; + match State::from_u8(fl) { + Some(flags) => { + Ok(Peer { + addr: addr.0, + capabilities: capabilities, + user_agent: user_agent, + flags: flags, + }) + } + None => Err(ser::Error::CorruptedData), + } + } +} + +pub struct PeerStore { + db: grin_store::Store, +} + +impl PeerStore { + pub fn new(root_path: String) -> Result { + let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?; + Ok(PeerStore { db: db }) + } + + pub fn save_peer(&self, p: &Peer) -> Result<(), Error> { + self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], p) + } + + pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { + self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + } + + pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { + let peers_iter = self.db.iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); + let mut peers = Vec::with_capacity(count); + for p in peers_iter { + if p.flags == state && p.capabilities.contains(cap) { + peers.push(p); + } + if peers.len() >= count { + break; + } + } + peers + } +} diff --git a/store/src/lib.rs b/store/src/lib.rs index f21bf298e..2dfdcdf92 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -27,10 +27,12 @@ extern crate rocksdb; const SEP: u8 = ':' as u8; use std::fmt; +use std::iter::Iterator; +use std::marker::PhantomData; use std::sync::RwLock; use byteorder::{WriteBytesExt, BigEndian}; -use rocksdb::{DB, WriteBatch, DBCompactionStyle}; +use rocksdb::{DB, WriteBatch, DBCompactionStyle, DBIterator, IteratorMode, Direction}; use core::ser; @@ -46,11 +48,10 @@ pub enum Error { SerErr(ser::Error), } - impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - &Error::NotFoundErr => write!(f, "Not Found"), + &Error::NotFoundErr => write!(f, "Not Found"), &Error::RocksDbErr(ref s) => write!(f, "RocksDb Error: {}", s), &Error::SerErr(ref e) => write!(f, "Serialization Error: {}", e.to_string()), } @@ -135,6 +136,17 @@ impl Store { db.delete(key).map_err(From::from) } + /// Produces an iterator of `Readable` types moving forward from the + /// provided + /// key. + pub fn iter>(&self, from: &[u8]) -> SerIterator { + let db = self.rdb.read().unwrap(); + SerIterator { + iter: db.iterator(IteratorMode::From(from, Direction::Forward)), + _marker: PhantomData, + } + } + /// Builds a new batch to be used with this store. pub fn batch(&self) -> Batch { Batch { @@ -175,6 +187,29 @@ impl<'a> Batch<'a> { } } +/// An iterator thad produces Readable instances back. Wraps the lower level +/// DBIterator and deserializes the returned values. +pub struct SerIterator + where T: ser::Readable +{ + iter: DBIterator, + _marker: PhantomData, +} + +impl Iterator for SerIterator + where T: ser::Readable +{ + type Item = T; + + fn next(&mut self) -> Option { + let next = self.iter.next(); + next.and_then(|r| { + let (_, v) = r; + ser::deserialize(&mut &v[..]).ok() + }) + } +} + /// Build a db key from a prefix and a byte vector identifier. pub fn to_key(prefix: u8, id: &mut Vec) -> &mut Vec { id.insert(0, SEP);