diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 35172649b..1a03de726 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -17,6 +17,7 @@ enum_primitive = "^0.1.0" num = "^0.1.36" grin_core = { path = "../core" } +grin_store = { path = "../store" } grin_util = { path = "../util" } [dev-dependencies] diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 939ca42b2..630ce3859 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -54,7 +54,7 @@ impl Handshake { let nonce = self.next_nonce(); let hand = Hand { version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, + capabilities: FULL_HIST, nonce: nonce, total_difficulty: total_difficulty, sender_addr: SockAddr(conn.local_addr().unwrap()), @@ -123,7 +123,7 @@ impl Handshake { // send our reply with our info let shake = Shake { version: PROTOCOL_VERSION, - capabilities: FULL_SYNC, + capabilities: FULL_HIST, total_difficulty: total_difficulty, user_agent: USER_AGENT.to_string(), }; diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 0ce8dfbb9..80d7b9332 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -26,6 +26,7 @@ extern crate bitflags; extern crate enum_primitive; #[macro_use] extern crate grin_core as core; +extern crate grin_store; extern crate grin_util as util; #[macro_use] extern crate log; @@ -43,6 +44,7 @@ mod msg; mod peer; mod protocol; mod server; +pub mod store; mod types; pub use server::{Server, DummyAdapter}; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index e982f18fc..e5398ac83 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -31,6 +31,7 @@ use types::*; /// Current latest version of the protocol pub const PROTOCOL_VERSION: u32 = 1; + /// Grin's user agent with current version (TODO externalize) pub const USER_AGENT: &'static str = "MW/Grin 0.1"; diff --git a/p2p/src/proto.rs b/p2p/src/proto.rs new file mode 100644 index 000000000..cadd61faa --- /dev/null +++ b/p2p/src/proto.rs @@ -0,0 +1,233 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{io, str}; +use std::convert::From; +use std::marker::PhantomData; +use std::net::SocketAddr; +use std::sync::Arc; +use std::thread; + +use net2; + +use futures::{future, Future, Stream}; +use tokio_core::io::{Io, Codec, EasyBuf, Framed}; +use tokio_core::net::{TcpStream, TcpListener}; +use tokio_core::reactor::{Core, Handle}; +use tokio_proto::{TcpClient, TcpServer, BindClient, BindServer}; +use tokio_proto::streaming::{Message, Body}; +use tokio_proto::streaming::multiplex::{Frame, ServerProto, ClientProto}; +use tokio_service::{Service, NewService}; + +use core::ser; +use msg::*; + +struct GrinCodec { + decoding_head: bool, +} + +impl Codec for GrinCodec { + type In = Frame<MsgHeader, Vec<u8>, io::Error>; + type Out = Frame<MsgHeader, Vec<u8>, io::Error>; + + fn encode(&mut self, msg: Self::In, mut buf: &mut Vec<u8>) -> io::Result<()> { + match msg { + Frame::Message{id, message, ..} => { + ser::serialize(&mut buf, &message).map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Encoding error: {:?}", e)))?; + }, + Frame::Body{id, chunk} => { + if let Some(chunk) = chunk { + buf.extend(chunk); + } + }, + Frame::Error{error, ..} => return Err(error), + } + Ok(()) + } + + fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::Out>, io::Error> { + unimplemented!(); + } +} + +struct GrinProto; + +impl <T: Io + 'static> ServerProto<T> for GrinProto { + type Request = MsgHeader; + type RequestBody = Vec<u8>; + type Response = MsgHeader; + type ResponseBody = Vec<u8>; + type Error = io::Error; + + type Transport = Framed<T, GrinCodec>; + type BindTransport = Result<Self::Transport, io::Error>; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(GrinCodec{decoding_head: true})) + } +} + +struct GrinReceiver; + +impl Service for GrinReceiver { + type Request = Message<MsgHeader, Body<Vec<u8>, io::Error>>; + type Response = Message<MsgHeader, Body<Vec<u8>, io::Error>>; + type Error = io::Error; + type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; + + fn call(&self, req: Self::Request) -> Self::Future { + let header = req.get_ref(); + let response = match header.msg_type { + Type::Ping => { + let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0)).unwrap(); + Message::WithoutBody(MsgHeader::new(Type::Pong, 0)) + }, + _ => { + unimplemented!() + } + }; + Box::new(future::ok(response)) + } +} + +struct GrinClient; + +impl Service for GrinClient { + type Request = Message<MsgHeader, Body<Vec<u8>, io::Error>>; + type Response = Message<MsgHeader, Body<Vec<u8>, io::Error>>; + type Error = io::Error; + type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; + + fn call(&self, req: Self::Request) -> Self::Future { + unimplemented!(); + } +} + +pub struct TcpClientServer<Kind, P> { + _kind: PhantomData<Kind>, + proto: Arc<P>, + threads: usize, + addr: SocketAddr, +} + +impl<Kind, P> TcpClientServer<Kind, P> where + P: BindServer<Kind, TcpStream> + BindClient<Kind, TcpStream> + Send + Sync + 'static { + + pub fn new(protocol: P, addr: SocketAddr) -> TcpClientServer<Kind, P> { + TcpClientServer{ + _kind: PhantomData, + proto: Arc::new(protocol), + threads: 1, + addr: addr, + } + } + + /// Set the number of threads running simultaneous event loops (Unix only). + pub fn threads(&mut self, threads: usize) { + assert!(threads > 0); + if cfg!(unix) { + self.threads = threads; + } + } + + /// Start up the server, providing the given service on it. + /// + /// This method will block the current thread until the server is shut down. + pub fn serve<S>(&self, new_service: S) where + S: NewService<Request = <P as BindServer<Kind, TcpStream>>::ServiceRequest, + Response = <P as BindServer<Kind, TcpStream>>::ServiceResponse, + Error = <P as BindServer<Kind, TcpStream>>::ServiceError> + Send + Sync + 'static, + { + let new_service = Arc::new(new_service); + self.with_handle(move |_| new_service.clone()) + } + + /// Start up the server, providing the given service on it, and providing + /// access to the event loop handle. + /// + /// The `new_service` argument is a closure that is given an event loop + /// handle, and produces a value implementing `NewService`. That value is in + /// turned used to make a new service instance for each incoming connection. + /// + /// This method will block the current thread until the server is shut down. + pub fn with_handle<F, S>(&self, new_service: F) where + F: Fn(&Handle) -> S + Send + Sync + 'static, + S: NewService<Request = <P as BindServer<Kind, TcpStream>>::ServiceRequest, + Response = <P as BindServer<Kind, TcpStream>>::ServiceResponse, + Error = <P as BindServer<Kind, TcpStream>>::ServiceError> + Send + Sync + 'static, + { + let proto = self.proto.clone(); + let new_service = Arc::new(new_service); + let addr = self.addr; + let workers = self.threads; + + let threads = (0..self.threads - 1).map(|i| { + let proto = proto.clone(); + let new_service = new_service.clone(); + + thread::Builder::new().name(format!("worker{}", i)).spawn(move || { + serve(proto, addr, workers, &*new_service) + }).unwrap() + }).collect::<Vec<_>>(); + + serve(proto, addr, workers, &*new_service); + + for thread in threads { + thread.join().unwrap(); + } + } +} + +fn serve<P, Kind, F, S>(binder: Arc<P>, addr: SocketAddr, workers: usize, new_service: &F) + where P: BindServer<Kind, TcpStream> + BindClient<Kind, TcpStream>, + F: Fn(&Handle) -> S, + S: NewService<Request = <P as BindServer<Kind, TcpStream>>::ServiceRequest, + Response = <P as BindServer<Kind, TcpStream>>::ServiceResponse, + Error = <P as BindServer<Kind, TcpStream>>::ServiceError> + 'static, +{ + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let new_service = new_service(&handle); + let listener = listener(&addr, workers, &handle).unwrap(); + + let server = listener.incoming().for_each(move |(socket, _)| { + // Create the service + let service = try!(new_service.new_service()); + + // Bind it! + binder.bind_server(&handle, socket, service); + binder.bind_client(&handle, socket); + + Ok(()) + }); + + core.run(server).unwrap(); +} + +fn listener(addr: &SocketAddr, + workers: usize, + handle: &Handle) -> io::Result<TcpListener> { + let listener = match *addr { + SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()), + SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()), + }; + // TODO re-add + // try!(configure_tcp(workers, &listener)); + try!(listener.reuse_address(true)); + try!(listener.bind(addr)); + listener.listen(1024).and_then(|l| { + TcpListener::from_listener(l, addr, handle) + }) +} + diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 9fbfe7df5..2abdcdee0 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -40,7 +40,7 @@ pub struct Peer { pub addr: SocketAddr, pub capabilities: Capabilities, pub user_agent: String, - pub flags: State + pub flags: State, } impl Writeable for Peer { @@ -49,9 +49,9 @@ impl Writeable for Peer { ser_multiwrite!(writer, [write_u32, self.capabilities.bits()], [write_bytes, &self.user_agent], - [write_u8, self.flags as u8]); - Ok(()) - } + [write_u8, self.flags as u8]); + Ok(()) + } } impl Readable<Peer> for Peer { @@ -60,18 +60,18 @@ impl Readable<Peer> for Peer { let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; - match State::from_u8(fl) { - Some(flags) => { - Ok(Peer { - addr: addr.0, - capabilities: capabilities, - user_agent: user_agent, - flags: flags, - }) - } + match State::from_u8(fl) { + Some(flags) => { + Ok(Peer { + addr: addr.0, + capabilities: capabilities, + user_agent: user_agent, + flags: flags, + }) + } None => Err(ser::Error::CorruptedData), - } - } + } + } } pub struct PeerStore { @@ -82,27 +82,29 @@ impl PeerStore { pub fn new(root_path: String) -> Result<PeerStore, Error> { let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?; Ok(PeerStore { db: db }) - } + } - pub fn save_peer(&self, p: &Peer) -> Result<(), Error> { - self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], p) - } + pub fn save_peer(&self, p: &Peer) -> Result<(), Error> { + self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], + p) + } - pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { + pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) - } + } - pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<Peer> { - let peers_iter = self.db.iter::<Peer>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); - let mut peers = Vec::with_capacity(count); - for p in peers_iter { - if p.flags == state && p.capabilities.contains(cap) { - peers.push(p); - } - if peers.len() >= count { - break; - } - } - peers - } + pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<Peer> { + let peers_iter = self.db + .iter::<Peer>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); + let mut peers = Vec::with_capacity(count); + for p in peers_iter { + if p.flags == state && p.capabilities.contains(cap) { + peers.push(p); + } + if peers.len() >= count { + break; + } + } + peers + } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 64457dc8b..0eba0ee5f 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -55,8 +55,11 @@ bitflags! { pub flags Capabilities: u32 { /// We don't know (yet) what the peer can do. const UNKNOWN = 0b00000000, - /// Runs with the easier version of the Proof of Work, mostly to make testing easier. - const FULL_SYNC = 0b00000001, + /// Full archival node, has the whole history without any pruning. + const FULL_HIST = 0b00000001, + /// Can provide block headers and the UTXO set for some recent-enough + /// height. + const UTXO_HIST = 0b00000010, } }