mirror of
https://github.com/mimblewimble/grin.git
synced 2025-04-30 22:31:15 +03:00
A db store for peer data.
This commit is contained in:
parent
85edf57366
commit
786da24653
7 changed files with 280 additions and 38 deletions
|
@ -17,6 +17,7 @@ enum_primitive = "^0.1.0"
|
||||||
num = "^0.1.36"
|
num = "^0.1.36"
|
||||||
|
|
||||||
grin_core = { path = "../core" }
|
grin_core = { path = "../core" }
|
||||||
|
grin_store = { path = "../store" }
|
||||||
grin_util = { path = "../util" }
|
grin_util = { path = "../util" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
@ -54,7 +54,7 @@ impl Handshake {
|
||||||
let nonce = self.next_nonce();
|
let nonce = self.next_nonce();
|
||||||
let hand = Hand {
|
let hand = Hand {
|
||||||
version: PROTOCOL_VERSION,
|
version: PROTOCOL_VERSION,
|
||||||
capabilities: FULL_SYNC,
|
capabilities: FULL_HIST,
|
||||||
nonce: nonce,
|
nonce: nonce,
|
||||||
total_difficulty: total_difficulty,
|
total_difficulty: total_difficulty,
|
||||||
sender_addr: SockAddr(conn.local_addr().unwrap()),
|
sender_addr: SockAddr(conn.local_addr().unwrap()),
|
||||||
|
@ -123,7 +123,7 @@ impl Handshake {
|
||||||
// send our reply with our info
|
// send our reply with our info
|
||||||
let shake = Shake {
|
let shake = Shake {
|
||||||
version: PROTOCOL_VERSION,
|
version: PROTOCOL_VERSION,
|
||||||
capabilities: FULL_SYNC,
|
capabilities: FULL_HIST,
|
||||||
total_difficulty: total_difficulty,
|
total_difficulty: total_difficulty,
|
||||||
user_agent: USER_AGENT.to_string(),
|
user_agent: USER_AGENT.to_string(),
|
||||||
};
|
};
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern crate bitflags;
|
||||||
extern crate enum_primitive;
|
extern crate enum_primitive;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate grin_core as core;
|
extern crate grin_core as core;
|
||||||
|
extern crate grin_store;
|
||||||
extern crate grin_util as util;
|
extern crate grin_util as util;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
@ -43,6 +44,7 @@ mod msg;
|
||||||
mod peer;
|
mod peer;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
mod server;
|
mod server;
|
||||||
|
pub mod store;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
pub use server::{Server, DummyAdapter};
|
pub use server::{Server, DummyAdapter};
|
||||||
|
|
|
@ -31,6 +31,7 @@ use types::*;
|
||||||
|
|
||||||
/// Current latest version of the protocol
|
/// Current latest version of the protocol
|
||||||
pub const PROTOCOL_VERSION: u32 = 1;
|
pub const PROTOCOL_VERSION: u32 = 1;
|
||||||
|
|
||||||
/// Grin's user agent with current version (TODO externalize)
|
/// Grin's user agent with current version (TODO externalize)
|
||||||
pub const USER_AGENT: &'static str = "MW/Grin 0.1";
|
pub const USER_AGENT: &'static str = "MW/Grin 0.1";
|
||||||
|
|
||||||
|
|
233
p2p/src/proto.rs
Normal file
233
p2p/src/proto.rs
Normal file
|
@ -0,0 +1,233 @@
|
||||||
|
// Copyright 2016 The Grin Developers
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::{io, str};
|
||||||
|
use std::convert::From;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use net2;
|
||||||
|
|
||||||
|
use futures::{future, Future, Stream};
|
||||||
|
use tokio_core::io::{Io, Codec, EasyBuf, Framed};
|
||||||
|
use tokio_core::net::{TcpStream, TcpListener};
|
||||||
|
use tokio_core::reactor::{Core, Handle};
|
||||||
|
use tokio_proto::{TcpClient, TcpServer, BindClient, BindServer};
|
||||||
|
use tokio_proto::streaming::{Message, Body};
|
||||||
|
use tokio_proto::streaming::multiplex::{Frame, ServerProto, ClientProto};
|
||||||
|
use tokio_service::{Service, NewService};
|
||||||
|
|
||||||
|
use core::ser;
|
||||||
|
use msg::*;
|
||||||
|
|
||||||
|
struct GrinCodec {
|
||||||
|
decoding_head: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Codec for GrinCodec {
|
||||||
|
type In = Frame<MsgHeader, Vec<u8>, io::Error>;
|
||||||
|
type Out = Frame<MsgHeader, Vec<u8>, io::Error>;
|
||||||
|
|
||||||
|
fn encode(&mut self, msg: Self::In, mut buf: &mut Vec<u8>) -> io::Result<()> {
|
||||||
|
match msg {
|
||||||
|
Frame::Message{id, message, ..} => {
|
||||||
|
ser::serialize(&mut buf, &message).map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Encoding error: {:?}", e)))?;
|
||||||
|
},
|
||||||
|
Frame::Body{id, chunk} => {
|
||||||
|
if let Some(chunk) = chunk {
|
||||||
|
buf.extend(chunk);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Frame::Error{error, ..} => return Err(error),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::Out>, io::Error> {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct GrinProto;
|
||||||
|
|
||||||
|
impl <T: Io + 'static> ServerProto<T> for GrinProto {
|
||||||
|
type Request = MsgHeader;
|
||||||
|
type RequestBody = Vec<u8>;
|
||||||
|
type Response = MsgHeader;
|
||||||
|
type ResponseBody = Vec<u8>;
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
type Transport = Framed<T, GrinCodec>;
|
||||||
|
type BindTransport = Result<Self::Transport, io::Error>;
|
||||||
|
|
||||||
|
fn bind_transport(&self, io: T) -> Self::BindTransport {
|
||||||
|
Ok(io.framed(GrinCodec{decoding_head: true}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct GrinReceiver;
|
||||||
|
|
||||||
|
impl Service for GrinReceiver {
|
||||||
|
type Request = Message<MsgHeader, Body<Vec<u8>, io::Error>>;
|
||||||
|
type Response = Message<MsgHeader, Body<Vec<u8>, io::Error>>;
|
||||||
|
type Error = io::Error;
|
||||||
|
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
|
||||||
|
|
||||||
|
fn call(&self, req: Self::Request) -> Self::Future {
|
||||||
|
let header = req.get_ref();
|
||||||
|
let response = match header.msg_type {
|
||||||
|
Type::Ping => {
|
||||||
|
let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0)).unwrap();
|
||||||
|
Message::WithoutBody(MsgHeader::new(Type::Pong, 0))
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Box::new(future::ok(response))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct GrinClient;
|
||||||
|
|
||||||
|
impl Service for GrinClient {
|
||||||
|
type Request = Message<MsgHeader, Body<Vec<u8>, io::Error>>;
|
||||||
|
type Response = Message<MsgHeader, Body<Vec<u8>, io::Error>>;
|
||||||
|
type Error = io::Error;
|
||||||
|
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
|
||||||
|
|
||||||
|
fn call(&self, req: Self::Request) -> Self::Future {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TcpClientServer<Kind, P> {
|
||||||
|
_kind: PhantomData<Kind>,
|
||||||
|
proto: Arc<P>,
|
||||||
|
threads: usize,
|
||||||
|
addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Kind, P> TcpClientServer<Kind, P> where
|
||||||
|
P: BindServer<Kind, TcpStream> + BindClient<Kind, TcpStream> + Send + Sync + 'static {
|
||||||
|
|
||||||
|
pub fn new(protocol: P, addr: SocketAddr) -> TcpClientServer<Kind, P> {
|
||||||
|
TcpClientServer{
|
||||||
|
_kind: PhantomData,
|
||||||
|
proto: Arc::new(protocol),
|
||||||
|
threads: 1,
|
||||||
|
addr: addr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the number of threads running simultaneous event loops (Unix only).
|
||||||
|
pub fn threads(&mut self, threads: usize) {
|
||||||
|
assert!(threads > 0);
|
||||||
|
if cfg!(unix) {
|
||||||
|
self.threads = threads;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start up the server, providing the given service on it.
|
||||||
|
///
|
||||||
|
/// This method will block the current thread until the server is shut down.
|
||||||
|
pub fn serve<S>(&self, new_service: S) where
|
||||||
|
S: NewService<Request = <P as BindServer<Kind, TcpStream>>::ServiceRequest,
|
||||||
|
Response = <P as BindServer<Kind, TcpStream>>::ServiceResponse,
|
||||||
|
Error = <P as BindServer<Kind, TcpStream>>::ServiceError> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let new_service = Arc::new(new_service);
|
||||||
|
self.with_handle(move |_| new_service.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start up the server, providing the given service on it, and providing
|
||||||
|
/// access to the event loop handle.
|
||||||
|
///
|
||||||
|
/// The `new_service` argument is a closure that is given an event loop
|
||||||
|
/// handle, and produces a value implementing `NewService`. That value is in
|
||||||
|
/// turned used to make a new service instance for each incoming connection.
|
||||||
|
///
|
||||||
|
/// This method will block the current thread until the server is shut down.
|
||||||
|
pub fn with_handle<F, S>(&self, new_service: F) where
|
||||||
|
F: Fn(&Handle) -> S + Send + Sync + 'static,
|
||||||
|
S: NewService<Request = <P as BindServer<Kind, TcpStream>>::ServiceRequest,
|
||||||
|
Response = <P as BindServer<Kind, TcpStream>>::ServiceResponse,
|
||||||
|
Error = <P as BindServer<Kind, TcpStream>>::ServiceError> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let proto = self.proto.clone();
|
||||||
|
let new_service = Arc::new(new_service);
|
||||||
|
let addr = self.addr;
|
||||||
|
let workers = self.threads;
|
||||||
|
|
||||||
|
let threads = (0..self.threads - 1).map(|i| {
|
||||||
|
let proto = proto.clone();
|
||||||
|
let new_service = new_service.clone();
|
||||||
|
|
||||||
|
thread::Builder::new().name(format!("worker{}", i)).spawn(move || {
|
||||||
|
serve(proto, addr, workers, &*new_service)
|
||||||
|
}).unwrap()
|
||||||
|
}).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
serve(proto, addr, workers, &*new_service);
|
||||||
|
|
||||||
|
for thread in threads {
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serve<P, Kind, F, S>(binder: Arc<P>, addr: SocketAddr, workers: usize, new_service: &F)
|
||||||
|
where P: BindServer<Kind, TcpStream> + BindClient<Kind, TcpStream>,
|
||||||
|
F: Fn(&Handle) -> S,
|
||||||
|
S: NewService<Request = <P as BindServer<Kind, TcpStream>>::ServiceRequest,
|
||||||
|
Response = <P as BindServer<Kind, TcpStream>>::ServiceResponse,
|
||||||
|
Error = <P as BindServer<Kind, TcpStream>>::ServiceError> + 'static,
|
||||||
|
{
|
||||||
|
let mut core = Core::new().unwrap();
|
||||||
|
let handle = core.handle();
|
||||||
|
let new_service = new_service(&handle);
|
||||||
|
let listener = listener(&addr, workers, &handle).unwrap();
|
||||||
|
|
||||||
|
let server = listener.incoming().for_each(move |(socket, _)| {
|
||||||
|
// Create the service
|
||||||
|
let service = try!(new_service.new_service());
|
||||||
|
|
||||||
|
// Bind it!
|
||||||
|
binder.bind_server(&handle, socket, service);
|
||||||
|
binder.bind_client(&handle, socket);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
core.run(server).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listener(addr: &SocketAddr,
|
||||||
|
workers: usize,
|
||||||
|
handle: &Handle) -> io::Result<TcpListener> {
|
||||||
|
let listener = match *addr {
|
||||||
|
SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()),
|
||||||
|
SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()),
|
||||||
|
};
|
||||||
|
// TODO re-add
|
||||||
|
// try!(configure_tcp(workers, &listener));
|
||||||
|
try!(listener.reuse_address(true));
|
||||||
|
try!(listener.bind(addr));
|
||||||
|
listener.listen(1024).and_then(|l| {
|
||||||
|
TcpListener::from_listener(l, addr, handle)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ pub struct Peer {
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
pub capabilities: Capabilities,
|
pub capabilities: Capabilities,
|
||||||
pub user_agent: String,
|
pub user_agent: String,
|
||||||
pub flags: State
|
pub flags: State,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Writeable for Peer {
|
impl Writeable for Peer {
|
||||||
|
@ -49,9 +49,9 @@ impl Writeable for Peer {
|
||||||
ser_multiwrite!(writer,
|
ser_multiwrite!(writer,
|
||||||
[write_u32, self.capabilities.bits()],
|
[write_u32, self.capabilities.bits()],
|
||||||
[write_bytes, &self.user_agent],
|
[write_bytes, &self.user_agent],
|
||||||
[write_u8, self.flags as u8]);
|
[write_u8, self.flags as u8]);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Readable<Peer> for Peer {
|
impl Readable<Peer> for Peer {
|
||||||
|
@ -60,18 +60,18 @@ impl Readable<Peer> for Peer {
|
||||||
let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8);
|
let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8);
|
||||||
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
|
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
|
||||||
let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?;
|
let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?;
|
||||||
match State::from_u8(fl) {
|
match State::from_u8(fl) {
|
||||||
Some(flags) => {
|
Some(flags) => {
|
||||||
Ok(Peer {
|
Ok(Peer {
|
||||||
addr: addr.0,
|
addr: addr.0,
|
||||||
capabilities: capabilities,
|
capabilities: capabilities,
|
||||||
user_agent: user_agent,
|
user_agent: user_agent,
|
||||||
flags: flags,
|
flags: flags,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
None => Err(ser::Error::CorruptedData),
|
None => Err(ser::Error::CorruptedData),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PeerStore {
|
pub struct PeerStore {
|
||||||
|
@ -82,27 +82,29 @@ impl PeerStore {
|
||||||
pub fn new(root_path: String) -> Result<PeerStore, Error> {
|
pub fn new(root_path: String) -> Result<PeerStore, Error> {
|
||||||
let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?;
|
let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?;
|
||||||
Ok(PeerStore { db: db })
|
Ok(PeerStore { db: db })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn save_peer(&self, p: &Peer) -> Result<(), Error> {
|
pub fn save_peer(&self, p: &Peer) -> Result<(), Error> {
|
||||||
self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], p)
|
self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..],
|
||||||
}
|
p)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> {
|
pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> {
|
||||||
self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..])
|
self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<Peer> {
|
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<Peer> {
|
||||||
let peers_iter = self.db.iter::<Peer>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()));
|
let peers_iter = self.db
|
||||||
let mut peers = Vec::with_capacity(count);
|
.iter::<Peer>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()));
|
||||||
for p in peers_iter {
|
let mut peers = Vec::with_capacity(count);
|
||||||
if p.flags == state && p.capabilities.contains(cap) {
|
for p in peers_iter {
|
||||||
peers.push(p);
|
if p.flags == state && p.capabilities.contains(cap) {
|
||||||
}
|
peers.push(p);
|
||||||
if peers.len() >= count {
|
}
|
||||||
break;
|
if peers.len() >= count {
|
||||||
}
|
break;
|
||||||
}
|
}
|
||||||
peers
|
}
|
||||||
}
|
peers
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,11 @@ bitflags! {
|
||||||
pub flags Capabilities: u32 {
|
pub flags Capabilities: u32 {
|
||||||
/// We don't know (yet) what the peer can do.
|
/// We don't know (yet) what the peer can do.
|
||||||
const UNKNOWN = 0b00000000,
|
const UNKNOWN = 0b00000000,
|
||||||
/// Runs with the easier version of the Proof of Work, mostly to make testing easier.
|
/// Full archival node, has the whole history without any pruning.
|
||||||
const FULL_SYNC = 0b00000001,
|
const FULL_HIST = 0b00000001,
|
||||||
|
/// Can provide block headers and the UTXO set for some recent-enough
|
||||||
|
/// height.
|
||||||
|
const UTXO_HIST = 0b00000010,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue