From de4ebdde71a3af1cb59c8bd29555412134d0436f Mon Sep 17 00:00:00 2001 From: Jacob Payne Date: Mon, 22 May 2017 10:16:13 -0700 Subject: [PATCH] [WIP] Partial Transition from Writeable/Readable to Codecs (#51) * Sample Signatures for put_enc and get_dec * Implement put_enc and get_dec * Implement ChainCodec in grin_chain * Truncate src only on complete Blocks * Truncate src only on complete Tip + Check Len * Move BlockHeader Encoding to BlockHeaderCodec * Define put_enc for store::Batch * Replace BlockCodec and BlockHeaderCodec with generic BlockCodec * Implement Default for BlockCodec Manually * Replace get_ser/put_ser with get_enc/get_dec for chain::ChainKVStore * Remove Writeable/Readable for chain::Tip * Add Tokio-io and Bytes to grin_p2p * Additional Setup for Message enum + Msg{Encode,Decode} traits * base msg ping pong encoding and test * fill out msg-codec tests * Implement Hand Encoding/Decoding * msg-encode shake * msg-encode getpeeraddr * codec peer-addrs message, SockAddr struct wierdness * header message codec * msg encoding finished prelim * Implement PeerCodec Encoding/Decoding * Set PeerStore to use PeerCodec for Encoding/Decoding * Add a DecIterator * Prune PeerStore * Replace Decoding and Encoding in handle_payload * Prune Writeable/Readable methods in store::Store * Remove Incomplete Frame Testing ( Not Nessesary right now ) * separate block and tx codec tests * Refactor {Tx,Block}Codec Tests --- chain/Cargo.toml | 3 + chain/src/codec.rs | 184 +++++++ chain/src/lib.rs | 4 + chain/src/store.rs | 65 ++- chain/src/types.rs | 26 - core/src/core/block.rs | 4 +- core/src/core/transaction.rs | 8 +- p2p/Cargo.toml | 5 +- p2p/src/lib.rs | 6 + p2p/src/msg.rs | 12 +- p2p/src/msg_codec.rs | 952 ++++++++++++++++++++++++++++++++++ p2p/src/peer_codec.rs | 144 +++++ p2p/src/proto.rs | 3 +- p2p/src/store.rs | 56 +- secp256k1zkp/src/pedersen.rs | 6 + store/src/codec/block.rs | 281 +++------- store/src/codec/block_test.rs | 185 +++++++ store/src/codec/mod.rs | 10 +- store/src/codec/tx.rs | 90 +--- store/src/codec/tx_test.rs | 93 ++++ store/src/lib.rs | 121 ++--- 21 files changed, 1789 insertions(+), 469 deletions(-) create mode 100644 chain/src/codec.rs create mode 100644 p2p/src/msg_codec.rs create mode 100644 p2p/src/peer_codec.rs create mode 100644 store/src/codec/block_test.rs create mode 100644 store/src/codec/tx_test.rs diff --git a/chain/Cargo.toml b/chain/Cargo.toml index f57d895cf..0719d3caf 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -11,6 +11,9 @@ log = "^0.3" serde = "~0.9.10" serde_derive = "~0.9.10" time = "^0.1" +tokio-io = "0.1.1" +bytes = "0.4.2" +num-bigint = "^0.1.35" grin_core = { path = "../core" } grin_store = { path = "../store" } diff --git a/chain/src/codec.rs b/chain/src/codec.rs new file mode 100644 index 000000000..da135c191 --- /dev/null +++ b/chain/src/codec.rs @@ -0,0 +1,184 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of the chain block encoding and decoding. + +use std::io; + +use tokio_io::*; +use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; +use num_bigint::BigUint; + +use types::Tip; +use core::core::hash::Hash; +use core::core::target::Difficulty; + +// Convenience Macro for Option Handling in Decoding +macro_rules! try_opt_dec { + ($e: expr) => (match $e { + Some(val) => val, + None => return Ok(None), + }); +} + +/// Codec for Decoding and Encoding a `Tip` +#[derive(Debug, Clone, Default)] +pub struct ChainCodec; + +impl codec::Encoder for ChainCodec { + type Item = Tip; + type Error = io::Error; + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + // Put Height + dst.reserve(8); + dst.put_u64::(item.height); + + // Put Last Block Hash + item.last_block_h.chain_encode(dst)?; + + // Put Previous Block Hash + item.prev_block_h.chain_encode(dst)?; + + // Put Difficulty + item.total_difficulty.chain_encode(dst)?; + + Ok(()) + } +} + +impl codec::Decoder for ChainCodec { + type Item = Tip; + type Error = io::Error; + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + + // Create Temporary Buffer + let ref mut temp = src.clone(); + + // Get Height + if temp.len() < 8 { + return Ok(None); + } + let mut buf = temp.split_to(8).into_buf(); + let height = buf.get_u64::(); + + // Get Last Block Hash + let last_block_h = try_opt_dec!(Hash::chain_decode(temp)?); + + // Get Previous Block Hash + let prev_block_h = try_opt_dec!(Hash::chain_decode(temp)?); + + // Get Difficulty + let total_difficulty = try_opt_dec!(Difficulty::chain_decode(temp)?); + + // If succesfull truncate src by bytes read from temp; + let diff = src.len() - temp.len(); + src.split_to(diff); + + Ok(Some(Tip { + height: height, + last_block_h: last_block_h, + prev_block_h: prev_block_h, + total_difficulty: total_difficulty, + })) + } +} + +/// Internal Convenience Trait +trait ChainEncode: Sized { + fn chain_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; +} + +/// Internal Convenience Trait +trait ChainDecode: Sized { + fn chain_decode(src: &mut BytesMut) -> Result, io::Error>; +} + +impl ChainEncode for Difficulty { + fn chain_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + let data = self.clone().into_biguint().to_bytes_be(); + dst.reserve(1 + data.len()); + dst.put_u8(data.len() as u8); + dst.put_slice(&data); + Ok(()) + } +} + +impl ChainDecode for Difficulty { + fn chain_decode(src: &mut BytesMut) -> Result, io::Error> { + if src.len() < 1 { + return Ok(None); + } + let mut buf = src.split_to(1).into_buf(); + let dlen = buf.get_u8() as usize; + + if src.len() < dlen { + return Ok(None); + } + + let buf = src.split_to(dlen).into_buf(); + let data = Buf::bytes(&buf); + + Ok(Some(Difficulty::from_biguint(BigUint::from_bytes_be(data)))) + } +} + +impl ChainEncode for Hash { + fn chain_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + dst.reserve(32); + dst.put_slice(self.as_ref()); + Ok(()) + } +} + +impl ChainDecode for Hash { + fn chain_decode(src: &mut BytesMut) -> Result, io::Error> { + if src.len() < 32 { + return Ok(None); + } + + let mut buf = src.split_to(32).into_buf(); + let mut hash_data = [0; 32]; + buf.copy_to_slice(&mut hash_data); + + Ok(Some(Hash(hash_data))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_have_chain_codec_roundtrip() { + use tokio_io::codec::{Encoder, Decoder}; + + let sample_gdb = Hash([1u8; 32]); + let tip = Tip::new(sample_gdb); + + let mut buf = BytesMut::with_capacity(0); + let mut codec = ChainCodec {}; + codec.encode(tip.clone(), &mut buf).expect("Error During Tip Encoding"); + + let d_tip = + codec.decode(&mut buf).expect("Error During Tip Decoding").expect("Unfinished Tip"); + + // Check if all bytes are read + assert_eq!(buf.len(), 0); + + assert_eq!(tip.height, d_tip.height); + assert_eq!(tip.last_block_h, d_tip.last_block_h); + assert_eq!(tip.prev_block_h, d_tip.prev_block_h); + assert_eq!(tip.total_difficulty, d_tip.total_difficulty); + } +} \ No newline at end of file diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 03f966941..15df331f2 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -29,6 +29,9 @@ extern crate serde; #[macro_use] extern crate serde_derive; extern crate time; +extern crate tokio_io; +extern crate bytes; +extern crate num_bigint; extern crate grin_core as core; extern crate grin_store; @@ -37,6 +40,7 @@ extern crate secp256k1zkp as secp; pub mod pipe; pub mod store; pub mod types; +pub mod codec; // Re-export the base interface diff --git a/chain/src/store.rs b/chain/src/store.rs index 70ab4bc25..a18e9aa40 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -17,9 +17,11 @@ use secp::pedersen::Commitment; use types::*; +use codec::ChainCodec; use core::core::hash::{Hash, Hashed}; use core::core::{Block, BlockHeader, Output}; use grin_store::{self, Error, to_key, u64_to_key, option_to_not_found}; +use grin_store::codec::BlockCodec; const STORE_SUBPATH: &'static str = "chain"; @@ -40,46 +42,49 @@ pub struct ChainKVStore { impl ChainKVStore { pub fn new(root_path: String) -> Result { let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?; + let codec = ChainCodec::default(); Ok(ChainKVStore { db: db }) } } impl ChainStore for ChainKVStore { fn head(&self) -> Result { - option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX])) + option_to_not_found(self.db.get_dec(&mut ChainCodec, &[HEAD_PREFIX])) } fn head_header(&self) -> Result { - let head: Tip = try!(option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]))); + let head: Tip = option_to_not_found(self.db.get_dec(&mut ChainCodec, &[HEAD_PREFIX]))?; self.get_block_header(&head.last_block_h) } fn save_head(&self, t: &Tip) -> Result<(), Error> { self.db .batch() - .put_ser(&vec![HEAD_PREFIX], t)? - .put_ser(&vec![HEADER_HEAD_PREFIX], t)? + .put_enc(&mut ChainCodec, &[HEAD_PREFIX], t.clone())? + .put_enc(&mut ChainCodec, &[HEADER_HEAD_PREFIX], t.clone())? .write() } fn save_body_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_ser(&vec![HEAD_PREFIX], t) + self.db.put_enc(&mut ChainCodec, &[HEAD_PREFIX], t.clone()) } fn get_header_head(&self) -> Result { - option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX])) + option_to_not_found(self.db.get_dec(&mut ChainCodec, &[HEADER_HEAD_PREFIX])) } fn save_header_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t) + self.db.put_enc(&mut ChainCodec, &[HEADER_HEAD_PREFIX], t.clone()) } fn get_block(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec()))) + option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), + &to_key(BLOCK_PREFIX, &mut h.to_vec()))) } fn get_block_header(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec()))) + option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), + &to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec()))) } fn check_block_exists(&self, h: &Hash) -> Result { @@ -90,39 +95,52 @@ impl ChainStore for ChainKVStore { // saving the block and its header let mut batch = self.db .batch() - .put_ser(&to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..], b)? - .put_ser(&to_key(BLOCK_HEADER_PREFIX, &mut b.hash().to_vec())[..], - &b.header)?; + .put_enc(&mut BlockCodec::default(), + &to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..], + b.clone())? + .put_enc(&mut BlockCodec::default(), + &to_key(BLOCK_HEADER_PREFIX, &mut b.hash().to_vec())[..], + b.header.clone())?; // saving the full output under its hash, as well as a commitment to hash index for out in &b.outputs { - batch = batch.put_ser(&to_key(OUTPUT_PREFIX, &mut out.hash().to_vec())[..], out)? - .put_ser(&to_key(OUTPUT_COMMIT_PREFIX, &mut out.commit.as_ref().to_vec())[..], - &out.hash())?; + batch = batch.put_enc(&mut BlockCodec::default(), + &to_key(OUTPUT_PREFIX, &mut out.hash().to_vec())[..], + out.clone())? + .put_enc(&mut BlockCodec::default(), + &to_key(OUTPUT_COMMIT_PREFIX, &mut out.commit.as_ref().to_vec())[..], + out.hash().clone())?; } batch.write() } fn save_block_header(&self, bh: &BlockHeader) -> Result<(), Error> { - self.db.put_ser(&to_key(BLOCK_HEADER_PREFIX, &mut bh.hash().to_vec())[..], - bh) + self.db.put_enc(&mut BlockCodec::default(), + &to_key(BLOCK_HEADER_PREFIX, &mut bh.hash().to_vec())[..], + bh.clone()) } fn get_header_by_height(&self, height: u64) -> Result { - option_to_not_found(self.db.get_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, height))) + option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), + &u64_to_key(HEADER_HEIGHT_PREFIX, height))) } fn get_output(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(OUTPUT_PREFIX, &mut h.to_vec()))) + option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), + &to_key(OUTPUT_PREFIX, &mut h.to_vec()))) } fn has_output_commit(&self, commit: &Commitment) -> Result { option_to_not_found(self.db - .get_ser(&to_key(OUTPUT_COMMIT_PREFIX, &mut commit.as_ref().to_vec()))) + .get_dec(&mut BlockCodec::default(), + &to_key(OUTPUT_COMMIT_PREFIX, &mut commit.as_ref().to_vec()))) } fn setup_height(&self, bh: &BlockHeader) -> Result<(), Error> { - self.db.put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), bh)?; + self.db + .put_enc(&mut BlockCodec::default(), + &u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), + bh.clone())?; let mut prev_h = bh.previous; let mut prev_height = bh.height - 1; @@ -131,8 +149,9 @@ impl ChainStore for ChainKVStore { if prev.hash() != prev_h { let real_prev = self.get_block_header(&prev_h)?; self.db - .put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, real_prev.height), - &real_prev); + .put_enc(&mut BlockCodec::default(), + &u64_to_key(HEADER_HEIGHT_PREFIX, real_prev.height), + real_prev.clone())?; prev_h = real_prev.previous; prev_height = real_prev.height - 1; } else { diff --git a/chain/src/types.rs b/chain/src/types.rs index 59f89546a..49a9b2edb 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -20,7 +20,6 @@ use grin_store::Error; use core::core::{Block, BlockHeader, Output}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use core::ser; /// The tip of a fork. A handle to the fork ancestry from its leaf in the /// blockchain tree. References the max height and the latest and previous @@ -60,31 +59,6 @@ impl Tip { } } -/// Serialization of a tip, required to save to datastore. -impl ser::Writeable for Tip { - fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - try!(writer.write_u64(self.height)); - try!(writer.write_fixed_bytes(&self.last_block_h)); - try!(writer.write_fixed_bytes(&self.prev_block_h)); - self.total_difficulty.write(writer) - } -} - -impl ser::Readable for Tip { - fn read(reader: &mut ser::Reader) -> Result { - let height = try!(reader.read_u64()); - let last = try!(Hash::read(reader)); - let prev = try!(Hash::read(reader)); - let diff = try!(Difficulty::read(reader)); - Ok(Tip { - height: height, - last_block_h: last, - prev_block_h: prev, - total_difficulty: diff, - }) - } -} - /// Trait the chain pipeline requires an implementor for in order to process /// blocks. pub trait ChainStore: Send + Sync { diff --git a/core/src/core/block.rs b/core/src/core/block.rs index cbc359c74..dc52c3541 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -36,7 +36,7 @@ bitflags! { } /// Block header, fairly standard compared to other blockchains. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct BlockHeader { /// Height of this block since the genesis block (height 0) pub height: u64, @@ -139,7 +139,7 @@ impl Readable for BlockHeader { /// non-explicit, assumed to be deducible from block height (similar to /// bitcoin's schedule) and expressed as a global transaction fee (added v.H), /// additive to the total of fees ever collected. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Block { /// The header with metadata and commitments to the rest of the data pub header: BlockHeader, diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 21816d522..2c0f8fbd8 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -37,7 +37,7 @@ bitflags! { /// Pedersen commitment and the signature, that guarantees that the commitments /// amount to zero. The signature signs the fee, which is retained for /// signature validation. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct TxKernel { /// Options for a kernel's structure or use pub features: KernelFeatures, @@ -88,7 +88,7 @@ impl TxKernel { } /// A transaction -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Transaction { /// Set of inputs spent by the transaction. pub inputs: Vec, @@ -239,7 +239,7 @@ impl Transaction { /// A transaction input, mostly a reference to an output being spent by the /// transaction. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq)] pub struct Input(pub Commitment); /// Implementation of Writeable for a transaction Input, defines how to write @@ -282,7 +282,7 @@ bitflags! { /// range /// proof guarantees the commitment includes a positive value without overflow /// and the ownership of the private key. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq)] pub struct Output { /// Options for an output's structure or use pub features: OutputFeatures, diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 1614ba775..32a1e564e 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -5,6 +5,8 @@ authors = ["Ignotus Peverell "] workspace = ".." [dependencies] +tokio-io = "0.1.1" +bytes = "0.4.2" bitflags = "^0.7.0" byteorder = "^0.5" futures = "^0.1.9" @@ -15,6 +17,7 @@ serde = "~0.9.10" serde_derive = "~0.9.10" tokio-core="^0.1.1" tokio-timer="^0.1.0" + time = "^0.1" enum_primitive = "^0.1.0" num = "^0.1.36" @@ -22,7 +25,7 @@ num = "^0.1.36" grin_core = { path = "../core" } grin_store = { path = "../store" } grin_util = { path = "../util" } +secp256k1zkp = { path = "../secp256k1zkp" } [dev-dependencies] env_logger = "^0.3" -secp256k1zkp = { path = "../secp256k1zkp" } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index f68d4e968..5953b5422 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -34,12 +34,16 @@ extern crate futures; #[macro_use] extern crate tokio_core; extern crate tokio_timer; +extern crate tokio_io; +extern crate bytes; extern crate rand; extern crate serde; #[macro_use] extern crate serde_derive; extern crate time; extern crate num; +extern crate secp256k1zkp as secp; + mod conn; pub mod handshake; @@ -49,6 +53,8 @@ mod protocol; mod server; mod store; mod types; +mod msg_codec; +mod peer_codec; pub use server::{Server, DummyAdapter}; pub use peer::Peer; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index b053fe41d..8240a16bc 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -36,7 +36,7 @@ pub const PROTOCOL_VERSION: u32 = 1; pub const USER_AGENT: &'static str = "MW/Grin 0.1"; /// Magic number expected in the header of every message -const MAGIC: [u8; 2] = [0x1e, 0xc5]; +pub const MAGIC: [u8; 2] = [0x1e, 0xc5]; /// Size in bytes of a message header pub const HEADER_LEN: u64 = 11; @@ -123,7 +123,7 @@ pub fn write_msg(conn: TcpStream, /// Header of any protocol message, used to identify incoming messages. pub struct MsgHeader { - magic: [u8; 2], + pub magic: [u8; 2], /// Type of the message. pub msg_type: Type, /// Tota length of the message in bytes. @@ -177,6 +177,7 @@ impl Readable for MsgHeader { /// First part of a handshake, sender advertises its version and /// characteristics. +#[derive(Clone, Debug, PartialEq)] pub struct Hand { /// protocol version of the sender pub version: u32, @@ -232,6 +233,7 @@ impl Readable for Hand { /// Second part of a handshake, receiver of the first part replies with its own /// version and characteristics. +#[derive(Clone, Debug, PartialEq)] pub struct Shake { /// sender version pub version: u32, @@ -273,6 +275,7 @@ impl Readable for Shake { } /// Ask for other peers addresses, required for network discovery. +#[derive(Clone, Debug, PartialEq)] pub struct GetPeerAddrs { /// Filters on the capabilities we'd like the peers to have pub capabilities: Capabilities, @@ -294,6 +297,7 @@ impl Readable for GetPeerAddrs { /// Peer addresses we know of that are fresh enough, in response to /// GetPeerAddrs. +#[derive(Clone, Debug, PartialEq)] pub struct PeerAddrs { pub peers: Vec, } @@ -327,6 +331,7 @@ impl Readable for PeerAddrs { /// We found some issue in the communication, sending an error back, usually /// followed by closing the connection. +#[derive(Clone, Debug, PartialEq)] pub struct PeerError { /// error code pub code: u32, @@ -355,6 +360,7 @@ impl Readable for PeerError { /// Only necessary so we can implement Readable and Writeable. Rust disallows /// implementing traits when both types are outside of this crate (which is the /// case for SocketAddr and Readable/Writeable). +#[derive(Clone, Debug, PartialEq)] pub struct SockAddr(pub SocketAddr); impl Writeable for SockAddr { @@ -408,6 +414,7 @@ impl Readable for SockAddr { } /// Serializable wrapper for the block locator. +#[derive(Clone, Debug, PartialEq)] pub struct Locator { pub hashes: Vec, } @@ -434,6 +441,7 @@ impl Readable for Locator { } /// Serializable wrapper for a list of block headers. +#[derive(Clone, Debug, PartialEq)] pub struct Headers { pub headers: Vec, } diff --git a/p2p/src/msg_codec.rs b/p2p/src/msg_codec.rs new file mode 100644 index 000000000..ff7feb287 --- /dev/null +++ b/p2p/src/msg_codec.rs @@ -0,0 +1,952 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of the p2p message encoding and decoding. + +use std::io; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr, IpAddr}; + +use tokio_io::*; +use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; +use tokio_io::codec::{Encoder, Decoder}; +use enum_primitive::FromPrimitive; + +use core::core::{Block, BlockHeader, Input, Output, Transaction, TxKernel}; +use core::core::hash::Hash; +use core::core::target::Difficulty; +use core::core::transaction::{OutputFeatures, KernelFeatures}; +use types::*; + +use secp::pedersen::{RangeProof, Commitment}; +use secp::constants::PEDERSEN_COMMITMENT_SIZE; + +use grin_store::codec::{BlockCodec, TxCodec}; + +use msg::*; +use msg::MsgHeader; + +const MSG_HEADER_SIZE: usize = 11; +const SOCKET_ADDR_MARKER_V4: u8 = 0; +const SOCKET_ADDR_MARKER_V6: u8 = 1; + +// Convenience Macro for Option Handling in Decoding +macro_rules! try_opt_dec { + ($e: expr) => (match $e { + Some(val) => val, + None => return Ok(None), + }); +} + +#[derive(Clone, Debug, PartialEq)] +enum Message { + PeerError(PeerError), + Hand(Hand), + Shake(Shake), + Ping, + Pong, + GetPeerAddrs(GetPeerAddrs), + PeerAddrs(PeerAddrs), + GetHeaders(Locator), + Headers(Headers), + GetBlock(Hash), + Block(Block), + Transaction(Transaction), +} + +/// Codec for Decoding and Encoding a `MsgHeader` +#[derive(Debug, Clone, Default)] +struct MsgCodec; + +impl codec::Encoder for MsgCodec { + type Item = Message; + type Error = io::Error; + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.reserve(MSG_HEADER_SIZE); + + let mut msg_dst = BytesMut::with_capacity(0); + + let header = match item { + Message::Pong => MsgHeader::new(Type::Pong, 0), + Message::Ping => MsgHeader::new(Type::Ping, 0), + Message::Hand(hand) => { + hand.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::Hand, msg_dst.len() as u64) + } + Message::Shake(shake) => { + shake.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::Shake, msg_dst.len() as u64) + } + Message::GetPeerAddrs(get_peer_addrs) => { + get_peer_addrs.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::GetPeerAddrs, msg_dst.len() as u64) + } + Message::PeerAddrs(peer_addrs) => { + peer_addrs.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::PeerAddrs, msg_dst.len() as u64) + } + Message::Headers(headers) => { + headers.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::Headers, msg_dst.len() as u64) + } + Message::GetHeaders(locator) => { + locator.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::GetHeaders, msg_dst.len() as u64) + } + Message::Block(block) => { + let mut codec = BlockCodec::default(); + codec.encode(block, &mut msg_dst)?; + MsgHeader::new(Type::Block, msg_dst.len() as u64) + } + Message::GetBlock(hash) => { + let mut codec = BlockCodec::default(); + codec.encode(hash, &mut msg_dst)?; + MsgHeader::new(Type::GetBlock, msg_dst.len() as u64) + } + Message::Transaction(tx) => { + let mut codec = TxCodec::default(); + codec.encode(tx, &mut msg_dst)?; + MsgHeader::new(Type::Transaction, msg_dst.len() as u64) + } + Message::PeerError(err) => { + err.msg_encode(&mut msg_dst)?; + MsgHeader::new(Type::Error, msg_dst.len() as u64) + } + }; + + // Put MsgHeader + dst.put_slice(&header.magic); + dst.put_u8(header.msg_type as u8); + dst.put_u64::(header.msg_len); + + // Put Body + dst.reserve(msg_dst.len()); + dst.put_slice(&msg_dst); + + Ok(()) + + } +} + +impl codec::Decoder for MsgCodec { + type Item = Message; + type Error = io::Error; + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // Create Temporary Buffer + let ref mut temp_src = src.clone(); + + // Decode Header + if temp_src.len() < MSG_HEADER_SIZE { + return Ok(None); + } + let mut buf = temp_src.split_to(MSG_HEADER_SIZE).into_buf(); + + // Get Magic + let mut some_magic = [0; 2]; + buf.copy_to_slice(&mut some_magic); + + // If Magic is invalid return error. + if some_magic != MAGIC { + return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Header")); + } + + // Ensure Valid Message Type + let msg_type = match Type::from_u8(buf.get_u8()) { + Some(t) => t, + None => { + return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Message Type")); + } + }; + + // Ensure sufficient data + let msg_len = buf.get_u64::() as usize; + if temp_src.len() < msg_len { + return Ok(None); + } + + // Attempt message body decode + let decoded_msg = match msg_type { + Type::Ping => Message::Ping, + Type::Pong => Message::Pong, + Type::Hand => { + let hand = try_opt_dec!(Hand::msg_decode(temp_src)?); + Message::Hand(hand) + } + Type::Shake => { + let shake = try_opt_dec!(Shake::msg_decode(temp_src)?); + Message::Shake(shake) + } + Type::GetPeerAddrs => { + let get_peer_addrs = try_opt_dec!(GetPeerAddrs::msg_decode(temp_src)?); + Message::GetPeerAddrs(get_peer_addrs) + } + Type::PeerAddrs => { + let peer_addrs = try_opt_dec!(PeerAddrs::msg_decode(temp_src)?); + Message::PeerAddrs(peer_addrs) + } + Type::Headers => { + let headers = try_opt_dec!(Headers::msg_decode(temp_src)?); + Message::Headers(headers) + } + Type::GetHeaders => { + let locator = try_opt_dec!(Locator::msg_decode(temp_src)?); + Message::GetHeaders(locator) + } + Type::Block => { + let mut codec = BlockCodec::default(); + let block = try_opt_dec!(codec.decode(temp_src)?); + Message::Block(block) + } + Type::GetBlock => { + let mut codec = BlockCodec::default(); + let hash = try_opt_dec!(codec.decode(temp_src)?); + Message::GetBlock(hash) + } + Type::Transaction => { + let mut codec = TxCodec::default(); + let transaction = try_opt_dec!(codec.decode(temp_src)?); + Message::Transaction(transaction) + } + Type::Error => { + let err = try_opt_dec!(PeerError::msg_decode(temp_src)?); + Message::PeerError(err) + } + }; + + // If succesfull truncate src by bytes read from temp_src; + let diff = src.len() - temp_src.len(); + src.split_to(diff); + + Ok(Some(decoded_msg)) + } +} + +// Internal Convenience Traits +pub trait MsgEncode: Sized { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; +} + +// Internal Convenience Trait +pub trait MsgDecode: Sized { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error>; +} + +impl MsgEncode for Hand { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + // Reserve for version(4), capabilities(4), nonce(8) + dst.reserve(16); + // Put Protocol Version + dst.put_u32::(self.version); + // Put Capabilities + dst.put_u32::(self.capabilities.bits()); + // Put Nonce + dst.put_u64::(self.nonce); + + // Put Difficulty with BlockCodec + BlockCodec::default() + .encode(self.total_difficulty.clone(), dst)?; + + // Put Sender Address + self.sender_addr.0.msg_encode(dst)?; + // Put Receier Address + self.receiver_addr.0.msg_encode(dst)?; + + // Put Size of String + let str_bytes = self.user_agent.as_bytes(); + dst.reserve(str_bytes.len() + 1); + + // Put Software Version + dst.put_u8(str_bytes.len() as u8); + dst.put_slice(str_bytes); + + Ok(()) + } +} + +impl MsgDecode for Hand { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { + // TODO: Check for Full Hand Size Upfront + if src.len() < 16 { + return Ok(None); + } + // Get Protocol Version, Capabilities, Nonce + let mut buf = src.split_to(16).into_buf(); + let version = buf.get_u32::(); + let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); + let nonce = buf.get_u64::(); + + // Get Total Difficulty + let total_difficulty = try_opt_dec!(BlockCodec::default().decode(src)?); + + // Get Sender and Receiver Addresses + let sender_addr = try_opt_dec!(SocketAddr::msg_decode(src)?); + let receiver_addr = try_opt_dec!(SocketAddr::msg_decode(src)?); + + + // Get Software Version + // TODO: Decide on Hand#user_agent size + if src.len() < 1 { + return Ok(None); + } + let mut buf = src.split_to(1).into_buf(); + let str_len = buf.get_u8() as usize; + if src.len() < str_len { + return Ok(None); + } + let buf = src.split_to(str_len).into_buf(); + let user_agent = String::from_utf8(buf.collect()) + .map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version") + })?; + + Ok(Some(Hand { + version: version, + capabilities: capabilities, + nonce: nonce, + total_difficulty: total_difficulty, + sender_addr: SockAddr(sender_addr), + receiver_addr: SockAddr(receiver_addr), + user_agent: user_agent, + })) + + } +} + +impl MsgEncode for Shake { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + // Reserve for version(4), capabilities(4) + dst.reserve(8); + // Put Protocol Version + dst.put_u32::(self.version); + // Put Capabilities + dst.put_u32::(self.capabilities.bits()); + + // Put Difficulty with BlockCodec + BlockCodec::default() + .encode(self.total_difficulty.clone(), dst)?; + + // Reserve for user agent string Size of String + let str_bytes = self.user_agent.as_bytes(); + dst.reserve(str_bytes.len() + 1); + + // Put user agent string + dst.put_u8(str_bytes.len() as u8); + dst.put_slice(str_bytes); + + Ok(()) + } +} + +impl MsgDecode for Shake { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { + // TODO: Check for Full Hand Size Upfront + if src.len() < 8 { + return Ok(None); + } + // Get Protocol Version, Capabilities, Nonce + let mut buf = src.split_to(8).into_buf(); + let version = buf.get_u32::(); + let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); + + // Get Total Difficulty + let total_difficulty = try_opt_dec!(BlockCodec::default().decode(src)?); + + // Get Software Version + // TODO: Decide on Hand#user_agent size + if src.len() < 1 { + return Ok(None); + } + let mut buf = src.split_to(1).into_buf(); + let str_len = buf.get_u8() as usize; + if src.len() < str_len { + return Ok(None); + } + let buf = src.split_to(str_len).into_buf(); + let user_agent = String::from_utf8(buf.collect()) + .map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version") + })?; + + Ok(Some(Shake { + version: version, + capabilities: capabilities, + total_difficulty: total_difficulty, + user_agent: user_agent, + })) + + } +} + +impl MsgEncode for GetPeerAddrs { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + // Reserve for and put Capabilities + dst.reserve(4); + dst.put_u32::(self.capabilities.bits()); + Ok(()) + } +} + +impl MsgDecode for GetPeerAddrs { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { + if src.len() < 4 { + return Ok(None); + } + let mut buf = src.split_to(4).into_buf(); + let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); + Ok(Some(GetPeerAddrs { capabilities: capabilities })) + } +} + +impl MsgEncode for PeerAddrs { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + dst.reserve(4); + dst.put_u32::(self.peers.len() as u32); + for p in &self.peers { + p.0.msg_encode(dst)?; + } + Ok(()) + } +} + +impl MsgDecode for PeerAddrs { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { + if src.len() < 4 { + return Ok(None); + } + let mut buf = src.split_to(4).into_buf(); + let peer_count = buf.get_u32::(); + // Check peer count is valid, return error or empty if applicable + if peer_count > MAX_PEER_ADDRS { + return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Count")); + } else if peer_count == 0 { + return Ok(Some(PeerAddrs { peers: vec![] })); + } + + let mut peers = Vec::with_capacity(peer_count as usize); + for _ in 0..peer_count { + let p = SocketAddr::msg_decode(src)?; + // XXX: Do not need SockAddr wrapper?? + peers.push(SockAddr(p.unwrap())); + } + + Ok(Some(PeerAddrs { peers: peers })) + } +} + +impl MsgEncode for Headers { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + dst.reserve(2); + dst.put_u16::(self.headers.len() as u16); + let mut block_codec = BlockCodec::default(); + for h in &self.headers { + block_codec.encode(h.clone(), dst)?; + } + + Ok(()) + } +} + +impl MsgDecode for Headers { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { + if src.len() < 2 { + return Ok(None); + } + // Get Headers Length + let mut buf = src.split_to(2).into_buf(); + let len = buf.get_u16::(); + + // Collect Headers + let mut headers = Vec::with_capacity(len as usize); + let mut block_codec = BlockCodec::default(); + for _ in 0..len { + let block = try_opt_dec!(block_codec.decode(src)?); + headers.push(block); + } + Ok(Some(Headers { headers: headers })) + } +} + +impl MsgEncode for Locator { + fn msg_encode(&self, dst: &mut BytesMut) -> Result< (), io::Error > { + dst.reserve(1); + let len = self.hashes.len() as u8; + dst.put_u8(len); + + let mut block_codec = BlockCodec::default(); + for h in &self.hashes { + block_codec.encode(h.clone(), dst)?; + } + Ok(()) + } +} + +impl MsgDecode for Locator { + fn msg_decode(src: &mut BytesMut) -> Result< Option, io::Error > { + if src.len() < 1 { + return Ok(None); + } + let mut buf = src.split_to(1).into_buf(); + let len = buf.get_u8(); + + let mut hashes = Vec::with_capacity(len as usize); + let mut block_codec = BlockCodec::default(); + for _ in 0..len { + let hash = try_opt_dec!(block_codec.decode(src)?); + hashes.push(hash); + } + + Ok(Some(Locator { hashes: hashes })) + } +} + +impl MsgEncode for PeerError { + fn msg_encode(&self, dst: &mut BytesMut) -> Result< (), io::Error > { + dst.reserve(4); + dst.put_u32::(self.code); + + let bytes = self.message.as_bytes(); + dst.reserve(1 + bytes.len()); + dst.put_u8(bytes.len() as u8); + dst.put_slice(bytes); + Ok(()) + } +} + + +impl MsgDecode for PeerError { + fn msg_decode(src: &mut BytesMut) -> Result< Option, io::Error > { + // Reserve for code(4) and msg length(1) + if src.len() < 5 { + return Ok(None); + } + let mut buf = src.split_to(5).into_buf(); + let code = buf.get_u32::(); + + let str_len = buf.get_u8() as usize; + + if src.len() < str_len { + return Ok(None); + } + + let buf = src.split_to(str_len).into_buf(); + + let message = String::from_utf8(buf.collect()) + .map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid Error Message Field") + })?; + + Ok(Some( PeerError { + code: code, + message: message + })) + } +} + +impl MsgEncode for SocketAddr { + fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { + match *self { + SocketAddr::V4(sav4) => { + dst.reserve(7); + dst.put_u8(SOCKET_ADDR_MARKER_V4); + dst.put_slice(&sav4.ip().octets()); + dst.put_u16::(sav4.port()); + Ok(()) + } + SocketAddr::V6(sav6) => { + dst.reserve(19); + dst.put_u8(SOCKET_ADDR_MARKER_V6); + + for seg in &sav6.ip().segments() { + dst.put_u16::(*seg); + } + + dst.put_u16::(sav6.port()); + Ok(()) + } + } + } +} + +impl MsgDecode for SocketAddr { + fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { + if src.len() < 7 { + return Ok(None); + } + + let marker = src.split_to(1)[0]; + match marker { + SOCKET_ADDR_MARKER_V4 => { + let mut buf = src.split_to(6).into_buf(); + + // Get V4 address + let mut ip = [0; 4]; + buf.copy_to_slice(&mut ip); + + // Get port + let port = buf.get_u16::(); + + // Build v4 socket + let socket = SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port); + Ok(Some(SocketAddr::V4(socket))) + } + SOCKET_ADDR_MARKER_V6 => { + if src.len() < 18 { + return Ok(None); + } + let mut buf = src.split_to(18).into_buf(); + + // Get V6 address + let mut ip = [0u16; 8]; + for i in 0..8 { + ip[i] = buf.get_u16::(); + } + + // Get Port + let port = buf.get_u16::(); + + // Build V6 socket + let socket = SocketAddrV6::new(Ipv6Addr::new(ip[0], + ip[1], + ip[2], + ip[3], + ip[4], + ip[5], + ip[6], + ip[7]), + port, + 0, + 0); + + Ok(Some(SocketAddr::V6(socket))) + } + _ => Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Socket Marker")), + } + } +} + + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_encode_decode_ping() { + let mut codec = MsgCodec; + let ping = Message::Ping; + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(ping.clone(), &mut buf) + .expect("Expected to encode ping message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode ping message") + .unwrap(); + + + assert_eq!(ping, result); + } + + fn should_handle_incomplete_without_modifying_src_ping() { + let mut codec = MsgCodec; + let ping = Message::Ping; + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(ping.clone(), &mut buf) + .expect("Expected to encode ping message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode ping message") + .unwrap(); + assert_eq!(ping, result); + } + + + #[test] + fn should_encode_decode_pong() { + let mut codec = MsgCodec; + let pong = Message::Pong; + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(pong.clone(), &mut buf) + .expect("Expected to encode pong message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode pong message") + .unwrap(); + assert_eq!(pong, result); + } + + #[test] + fn should_encode_decode_hand() { + let mut codec = MsgCodec; + let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 8080)); + let hand = Message::Hand(Hand { + version: 0, + capabilities: UNKNOWN, + nonce: 0, + total_difficulty: Difficulty::one(), + sender_addr: sample_socket_addr.clone(), + receiver_addr: sample_socket_addr.clone(), + user_agent: "test".to_string(), + }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(hand.clone(), &mut buf) + .expect("Expected to encode hand message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode hand message") + .expect("Expected a full hand message"); + + assert_eq!(hand, result); + } + + #[test] + fn should_encode_decode_shake() { + let mut codec = MsgCodec; + let shake = Message::Shake(Shake { + version: 0, + capabilities: UNKNOWN, + total_difficulty: Difficulty::one(), + user_agent: "test".to_string(), + }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(shake.clone(), &mut buf) + .expect("Expected to encode shake message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode shake message") + .expect("Expected full shake message."); + + assert_eq!(shake, result); + } + + #[test] + fn should_encode_decode_get_peer_addrs() { + let mut codec = MsgCodec; + let get_peer_addrs = Message::GetPeerAddrs(GetPeerAddrs { capabilities: UNKNOWN }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(get_peer_addrs.clone(), &mut buf) + .expect("Expected to encode get peer addrs message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode get peer addrs message") + .expect("Expected full get peer addrs message."); + + assert_eq!(get_peer_addrs, result); + } + + #[test] + fn should_encode_decode_peer_addrs() { + let mut codec = MsgCodec; + let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 8000)); + let peer_addrs = Message::PeerAddrs(PeerAddrs { peers: vec![sample_socket_addr] }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(peer_addrs.clone(), &mut buf) + .expect("Expected to encode peer addrs message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode peer addrs message") + .expect("Expected full peer addrs message"); + + assert_eq!(peer_addrs, result); + } + + #[test] + fn should_encode_decode_headers() { + let mut codec = MsgCodec; + let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 8000)); + + let headers = Message::Headers(Headers { headers: vec![BlockHeader::default()] }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(headers.clone(), &mut buf) + .expect("Expected to encode headers message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode headers message") + .expect("Expected full headers message"); + + assert_eq!(headers, result); + } + + #[test] + fn should_encode_decode_get_headers() { + let mut codec = MsgCodec; + let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 8000)); + + let get_headers = Message::GetHeaders(Locator { hashes: vec![Hash([1; 32])] }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(get_headers.clone(), &mut buf) + .expect("Expected to encode get headers msg"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode get headers msg") + .unwrap(); + + assert_eq!(get_headers, result); + } + + #[test] + fn should_encode_decode_get_block() { + let mut codec = MsgCodec; + + let get_block = Message::GetBlock(Hash([1; 32])); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(get_block.clone(), &mut buf) + .expect("Expected to encode hand"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode hand") + .unwrap(); + + assert_eq!(get_block, result); + } + + #[test] + fn should_encode_decode_block() { + let mut codec = MsgCodec; + + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let kernel = TxKernel { + features: KernelFeatures::empty(), + excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + excess_sig: vec![1; 10], + fee: 100, + }; + + let new_block = Block { + header: BlockHeader::default(), + inputs: vec![input], + outputs: vec![output], + kernels: vec![kernel], + }; + + let block = Message::Block(new_block); + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(block.clone(), &mut buf) + .expect("Expected to encode"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode") + .unwrap(); + + assert_eq!(block, result); + } + + #[test] + fn should_encode_decode_transaction() { + let mut codec = MsgCodec; + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let transaction = Message::Transaction(Transaction { + inputs: vec![input], + outputs: vec![output], + fee: 1 as u64, + excess_sig: vec![0; 10], + }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(transaction.clone(), &mut buf) + .expect("Expected to encode transaction message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode transaction message") + .unwrap(); + + assert_eq!(transaction, result); + } + + #[test] + fn should_encode_decode_error() { + let mut codec = MsgCodec; + + let error = Message::PeerError(PeerError { + code: 0, + message: "Uhoh".to_owned(), + }); + + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(error.clone(), &mut buf) + .expect("Expected to encode error message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode error message") + .unwrap(); + + assert_eq!(error, result); + } +} diff --git a/p2p/src/peer_codec.rs b/p2p/src/peer_codec.rs new file mode 100644 index 000000000..221487e27 --- /dev/null +++ b/p2p/src/peer_codec.rs @@ -0,0 +1,144 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of the peer data encoding and decoding + +use std::io; +use std::net::{SocketAddr, Ipv4Addr, Ipv6Addr, IpAddr}; + +use tokio_io::*; +use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; +use tokio_io::codec::{Encoder, Decoder}; +use enum_primitive::FromPrimitive; + +use types::*; +use msg_codec::{MsgDecode, MsgEncode}; +use store::{State, PeerData}; + +// Convenience Macro for Option Handling in Decoding +macro_rules! try_opt_dec { + ($e: expr) => (match $e { + Some(val) => val, + None => return Ok(None), + }); +} + +/// Codec for Decoding and Encoding a `PeerData` +#[derive(Debug, Clone, Default)] +pub struct PeerCodec; + +impl codec::Encoder for PeerCodec { + type Item = PeerData; + type Error = io::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + // Put socket address as u32 + MsgEncode::msg_encode(&item.addr, dst)?; + + // Put capabilities + dst.reserve(4); + dst.put_u32::(item.capabilities.bits()); + + // Put user agent string with u8 length first + let str_bytes = item.user_agent.as_bytes(); + dst.reserve(str_bytes.len() + 1); + dst.put_u8(str_bytes.len() as u8); + dst.put_slice(str_bytes); + + // Put flags + dst.reserve(1); + dst.put_u8(item.flags as u8); + + Ok(()) + } +} + +impl codec::Decoder for PeerCodec { + type Item = PeerData; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // Create Temporary Buffer + let ref mut temp_src = src.clone(); + + // Get socket address + let addr = try_opt_dec!(SocketAddr::msg_decode(temp_src)?); + + // Check for capabilites flags(4), user agent header(1) + if temp_src.len() < 5 { + return Ok(None); + } + + // Get capabilites + let mut buf = temp_src.split_to(5).into_buf(); + let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); + + // Check for user agent length(str_len) and flags(1) + let str_len = buf.get_u8() as usize; + if temp_src.len() < str_len + 1 { + return Ok(None); + } + + // Get User Agent + let buf = temp_src.split_to(str_len).into_buf(); + let user_agent = String::from_utf8(buf.collect()) + .map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version") + })?; + + // Get flags + let mut buf = temp_src.split_to(1).into_buf(); + let flags_data = buf.get_u8(); + let flags = State::from_u8(flags_data) + .ok_or(io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version"))?; + + // If succesfull truncate src by bytes read from temp_src; + let diff = src.len() - temp_src.len(); + src.split_to(diff); + + Ok(Some(PeerData { + addr: addr, + capabilities: capabilities, + user_agent: user_agent, + flags: flags, + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn should_encode_decode_peer_data() { + let mut codec = PeerCodec; + let peer_data = PeerData { + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000), + capabilities: UNKNOWN, + user_agent: "foo".to_string(), + flags: State::Healthy, + }; + let mut buf = BytesMut::with_capacity(0); + + codec + .encode(peer_data.clone(), &mut buf) + .expect("Expected to encode peer data message"); + + let result = codec + .decode(&mut buf) + .expect("Expected no Errors to decode peer data message") + .unwrap(); + + assert_eq!(peer_data, result); + } +} \ No newline at end of file diff --git a/p2p/src/proto.rs b/p2p/src/proto.rs index cadd61faa..fbfdcb709 100644 --- a/p2p/src/proto.rs +++ b/p2p/src/proto.rs @@ -44,7 +44,8 @@ impl Codec for GrinCodec { fn encode(&mut self, msg: Self::In, mut buf: &mut Vec) -> io::Result<()> { match msg { Frame::Message{id, message, ..} => { - ser::serialize(&mut buf, &message).map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Encoding error: {:?}", e)))?; + ser::serialize(&mut buf, &message) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Encoding error: {:?}", e)))?; }, Frame::Body{id, chunk} => { if let Some(chunk) = chunk { diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 59a7b03f0..34b2883ee 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -22,8 +22,10 @@ use grin_store::{self, Error, to_key, option_to_not_found}; use msg::SockAddr; use types::Capabilities; -const STORE_SUBPATH: &'static str = "peers"; +use tokio_io::codec::{Encoder, Decoder}; +use peer_codec::PeerCodec; +const STORE_SUBPATH: &'static str = "peers"; const PEER_PREFIX: u8 = 'p' as u8; /// Types of messages @@ -37,7 +39,7 @@ enum_from_primitive! { } /// Data stored for any given peer we've encountered. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct PeerData { /// Network address of the peer. pub addr: SocketAddr, @@ -50,37 +52,6 @@ pub struct PeerData { pub flags: State, } -impl Writeable for PeerData { - fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - SockAddr(self.addr).write(writer)?; - ser_multiwrite!(writer, - [write_u32, self.capabilities.bits()], - [write_bytes, &self.user_agent], - [write_u8, self.flags as u8]); - Ok(()) - } -} - -impl Readable for PeerData { - fn read(reader: &mut Reader) -> Result { - let addr = SockAddr::read(reader)?; - let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); - let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; - let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; - match State::from_u8(fl) { - Some(flags) => { - Ok(PeerData { - addr: addr.0, - capabilities: capabilities, - user_agent: user_agent, - flags: flags, - }) - } - None => Err(ser::Error::CorruptedData), - } - } -} - /// Storage facility for peer data. pub struct PeerStore { db: grin_store::Store, @@ -94,25 +65,30 @@ impl PeerStore { } pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { - self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], - p) + let key = to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes()); + self.db.put_enc(&mut PeerCodec, &key, p.clone()) } fn get_peer(&self, peer_addr: SocketAddr) -> Result { - option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..])) + let key = peer_key(peer_addr); + option_to_not_found(self.db.get_dec(&mut PeerCodec, &key)) } pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { - self.db.exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + let key = peer_key(peer_addr); + self.db.exists(&key) } pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { - self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + let key = peer_key(peer_addr); + self.db.delete(&key) } pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - let peers_iter = self.db - .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); + + let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes()); + let peers_iter = self.db.iter_dec(PeerCodec::default(), &key); + let mut peers = Vec::with_capacity(count); for p in peers_iter { if p.flags == state && p.capabilities.contains(cap) { diff --git a/secp256k1zkp/src/pedersen.rs b/secp256k1zkp/src/pedersen.rs index caf760f28..91c58c7a4 100644 --- a/secp256k1zkp/src/pedersen.rs +++ b/secp256k1zkp/src/pedersen.rs @@ -53,6 +53,12 @@ pub struct RangeProof { pub plen: usize, } +impl PartialEq for RangeProof { + fn eq(&self, other: &Self) -> bool { + self.proof.as_ref() == other.proof.as_ref() + } +} + impl Clone for RangeProof { #[inline] fn clone(&self) -> RangeProof { diff --git a/store/src/codec/block.rs b/store/src/codec/block.rs index 602608ae1..e37ce55bb 100644 --- a/store/src/codec/block.rs +++ b/store/src/codec/block.rs @@ -19,6 +19,7 @@ use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; use num_bigint::BigUint; use time::Timespec; use time; +use std::marker::PhantomData; use core::core::{Input, Output, Proof, TxKernel, Block, BlockHeader}; use core::core::hash::Hash; @@ -38,34 +39,83 @@ macro_rules! try_opt_dec { }); } -#[derive(Debug, Clone)] -pub struct BlockCodec; +/// Internal Convenience Trait +pub trait BlockEncode: Sized { + fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; +} -impl codec::Encoder for BlockCodec { - type Item = Block; +/// Internal Convenience Trait +pub trait BlockDecode: Sized { + fn block_decode(src: &mut BytesMut) -> Result, io::Error>; +} + +/// Decodes and encodes `Block`s and their subtypes +#[derive(Debug, Clone)] +pub struct BlockCodec { + phantom: PhantomData, +} + +impl Default for BlockCodec + where T: BlockDecode + BlockEncode +{ + fn default() -> Self { + BlockCodec { phantom: PhantomData } + } +} + +impl codec::Encoder for BlockCodec + where T: BlockDecode + BlockEncode +{ + type Item = T; type Error = io::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + T::block_encode(&item, dst) + } +} + +impl codec::Decoder for BlockCodec + where T: BlockDecode + BlockEncode +{ + type Item = T; + type Error = io::Error; + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // Create Temporary Buffer + let ref mut temp = src.clone(); + let res = try_opt_dec!(T::block_decode(temp)?); + + // If succesfull truncate src by bytes read from src; + let diff = src.len() - temp.len(); + src.split_to(diff); + + // Return Item + Ok(Some(res)) + } +} + + +impl BlockEncode for Block { + fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { // Put Header - item.header.block_encode(dst)?; + self.header.block_encode(dst)?; // Put Lengths of Inputs, Outputs and Kernels in 3 u64's dst.reserve(24); - dst.put_u64::(item.inputs.len() as u64); - dst.put_u64::(item.outputs.len() as u64); - dst.put_u64::(item.kernels.len() as u64); + dst.put_u64::(self.inputs.len() as u64); + dst.put_u64::(self.outputs.len() as u64); + dst.put_u64::(self.kernels.len() as u64); // Put Inputs - for inp in &item.inputs { + for inp in &self.inputs { inp.block_encode(dst)?; } // Put Outputs - for outp in &item.outputs { + for outp in &self.outputs { outp.block_encode(dst)?; } // Put TxKernels - for proof in &item.kernels { + for proof in &self.kernels { proof.block_encode(dst)?; } @@ -73,10 +123,9 @@ impl codec::Encoder for BlockCodec { } } -impl codec::Decoder for BlockCodec { - type Item = Block; - type Error = io::Error; - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { +impl BlockDecode for Block { + fn block_decode(src: &mut BytesMut) -> Result, io::Error> { + // Get Header let header = try_opt_dec!(BlockHeader::block_decode(src)?); @@ -129,16 +178,6 @@ impl codec::Encoder for BlockHasher { } } -/// Internal Convenience Trait -trait BlockEncode: Sized { - fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; -} - -/// Internal Convenience Trait -trait BlockDecode: Sized { - fn block_decode(src: &mut BytesMut) -> Result, io::Error>; -} - impl BlockEncode for BlockHeader { fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { partial_block_encode(self, dst)?; @@ -462,195 +501,3 @@ impl BlockDecode for Proof { Ok(Some(Proof(proof_data))) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn should_have_block_codec_roundtrip() { - use tokio_io::codec::{Encoder, Decoder}; - - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let kernel = TxKernel { - features: KernelFeatures::empty(), - excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - excess_sig: vec![1; 10], - fee: 100, - }; - - let block = Block { - header: BlockHeader::default(), - inputs: vec![input], - outputs: vec![output], - kernels: vec![kernel], - }; - - let mut buf = BytesMut::with_capacity(0); - let mut codec = BlockCodec {}; - codec.encode(block.clone(), &mut buf).expect("Error During Block Encoding"); - - let d_block = - codec.decode(&mut buf).expect("Error During Block Decoding").expect("Unfinished Block"); - - assert_eq!(block.header.height, d_block.header.height); - assert_eq!(block.header.previous, d_block.header.previous); - assert_eq!(block.header.timestamp, d_block.header.timestamp); - assert_eq!(block.header.cuckoo_len, d_block.header.cuckoo_len); - assert_eq!(block.header.utxo_merkle, d_block.header.utxo_merkle); - assert_eq!(block.header.tx_merkle, d_block.header.tx_merkle); - assert_eq!(block.header.features, d_block.header.features); - assert_eq!(block.header.nonce, d_block.header.nonce); - assert_eq!(block.header.pow, d_block.header.pow); - assert_eq!(block.header.difficulty, d_block.header.difficulty); - assert_eq!(block.header.total_difficulty, - d_block.header.total_difficulty); - - assert_eq!(block.inputs[0].commitment(), d_block.inputs[0].commitment()); - - assert_eq!(block.outputs[0].features, d_block.outputs[0].features); - assert_eq!(block.outputs[0].proof().as_ref(), - d_block.outputs[0].proof().as_ref()); - assert_eq!(block.outputs[0].commitment(), - d_block.outputs[0].commitment()); - - assert_eq!(block.kernels[0].features, d_block.kernels[0].features); - assert_eq!(block.kernels[0].excess, d_block.kernels[0].excess); - assert_eq!(block.kernels[0].excess_sig, d_block.kernels[0].excess_sig); - assert_eq!(block.kernels[0].fee, d_block.kernels[0].fee); - - } - - #[test] - fn should_encode_and_decode_blockheader() { - - let block_header = BlockHeader::default(); - - let mut buf = BytesMut::with_capacity(0); - block_header.block_encode(&mut buf); - - let d_block_header = BlockHeader::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(block_header.height, d_block_header.height); - assert_eq!(block_header.previous, d_block_header.previous); - assert_eq!(block_header.timestamp, d_block_header.timestamp); - assert_eq!(block_header.cuckoo_len, d_block_header.cuckoo_len); - assert_eq!(block_header.utxo_merkle, d_block_header.utxo_merkle); - assert_eq!(block_header.tx_merkle, d_block_header.tx_merkle); - assert_eq!(block_header.features, d_block_header.features); - assert_eq!(block_header.nonce, d_block_header.nonce); - assert_eq!(block_header.pow, d_block_header.pow); - assert_eq!(block_header.difficulty, d_block_header.difficulty); - assert_eq!(block_header.total_difficulty, - d_block_header.total_difficulty); - - } - - - #[test] - fn should_encode_and_decode_input() { - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let mut buf = BytesMut::with_capacity(0); - input.block_encode(&mut buf); - - assert_eq!([1; PEDERSEN_COMMITMENT_SIZE].as_ref(), buf); - assert_eq!(input.commitment(), - Input::block_decode(&mut buf) - .unwrap() - .unwrap() - .commitment()); - } - - #[test] - fn should_encode_and_decode_output() { - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let mut buf = BytesMut::with_capacity(0); - output.block_encode(&mut buf); - - let d_output = Output::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(output.features, d_output.features); - assert_eq!(output.proof().as_ref(), d_output.proof().as_ref()); - assert_eq!(output.commitment(), d_output.commitment()); - - } - - #[test] - fn should_encode_and_decode_txkernel() { - - let kernel = TxKernel { - features: KernelFeatures::empty(), - excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - excess_sig: vec![1; 10], - fee: 100, - }; - - let mut buf = BytesMut::with_capacity(0); - kernel.block_encode(&mut buf); - - let d_kernel = TxKernel::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(kernel.features, d_kernel.features); - assert_eq!(kernel.excess, d_kernel.excess); - assert_eq!(kernel.excess_sig, d_kernel.excess_sig); - assert_eq!(kernel.fee, d_kernel.fee); - } - - #[test] - fn should_encode_and_decode_difficulty() { - - let difficulty = Difficulty::from_num(1000); - - let mut buf = BytesMut::with_capacity(0); - difficulty.block_encode(&mut buf); - - let d_difficulty = Difficulty::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(difficulty, d_difficulty); - } - - #[test] - fn should_encode_and_decode_hash() { - - let hash = Hash([1u8; 32]); - - let mut buf = BytesMut::with_capacity(0); - hash.block_encode(&mut buf); - - let d_hash = Hash::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(hash, d_hash); - } - - #[test] - fn should_encode_and_decode_proof() { - - let proof = Proof::zero(); - - let mut buf = BytesMut::with_capacity(0); - proof.block_encode(&mut buf); - - let d_proof = Proof::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(proof, d_proof); - } -} \ No newline at end of file diff --git a/store/src/codec/block_test.rs b/store/src/codec/block_test.rs new file mode 100644 index 000000000..b512ff44a --- /dev/null +++ b/store/src/codec/block_test.rs @@ -0,0 +1,185 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use tokio_io::*; +use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; +use num_bigint::BigUint; +use time::Timespec; +use time; +use std::marker::PhantomData; + +use core::core::{Input, Output, Proof, TxKernel, Block, BlockHeader}; +use core::core::hash::Hash; +use core::core::target::Difficulty; +use core::core::transaction::{OutputFeatures, KernelFeatures}; +use core::core::block::BlockFeatures; +use core::consensus::PROOFSIZE; + +use secp::pedersen::{RangeProof, Commitment}; +use secp::constants::PEDERSEN_COMMITMENT_SIZE; + +use super::block::*; + +#[test] +fn should_have_block_codec_roundtrip() { + use tokio_io::codec::{Encoder, Decoder}; + + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let kernel = TxKernel { + features: KernelFeatures::empty(), + excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + excess_sig: vec![1; 10], + fee: 100, + }; + + let block = Block { + header: BlockHeader::default(), + inputs: vec![input], + outputs: vec![output], + kernels: vec![kernel], + }; + + let mut buf = BytesMut::with_capacity(0); + let mut codec = BlockCodec::default(); + codec + .encode(block.clone(), &mut buf) + .expect("Error During Block Encoding"); + + let d_block = codec + .decode(&mut buf) + .expect("Error During Block Decoding") + .expect("Unfinished Block"); + + // Check if all bytes are read + assert_eq!(buf.len(), 0); + assert_eq!(block, d_block); + +} + +#[test] +fn should_have_block_header_codec_roundtrip() { + use tokio_io::codec::{Encoder, Decoder}; + + let mut codec = BlockCodec::default(); + let block_header = BlockHeader::default(); + + let mut buf = BytesMut::with_capacity(0); + codec.encode(block_header.clone(), &mut buf); + + let d_block_header = codec.decode(&mut buf).unwrap().unwrap(); + + assert_eq!(block_header, d_block_header); + +} + + +#[test] +fn should_encode_and_decode_input() { + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let mut buf = BytesMut::with_capacity(0); + input.block_encode(&mut buf); + + let d_input = Input::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(input,d_input) +} + +#[test] +fn should_encode_and_decode_output() { + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let mut buf = BytesMut::with_capacity(0); + output.block_encode(&mut buf); + + let d_output = Output::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(output, d_output); +} + +#[test] +fn should_encode_and_decode_txkernel() { + + let kernel = TxKernel { + features: KernelFeatures::empty(), + excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + excess_sig: vec![1; 10], + fee: 100, + }; + + let mut buf = BytesMut::with_capacity(0); + kernel.block_encode(&mut buf); + + let d_kernel = TxKernel::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(kernel, d_kernel); +} + +#[test] +fn should_encode_and_decode_difficulty() { + + let difficulty = Difficulty::from_num(1000); + + let mut buf = BytesMut::with_capacity(0); + difficulty.block_encode(&mut buf); + + let d_difficulty = Difficulty::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(difficulty, d_difficulty); +} + +#[test] +fn should_encode_and_decode_hash() { + + let hash = Hash([1u8; 32]); + + let mut buf = BytesMut::with_capacity(0); + hash.block_encode(&mut buf); + + let d_hash = Hash::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(hash, d_hash); +} + +#[test] +fn should_encode_and_decode_proof() { + + let proof = Proof::zero(); + + let mut buf = BytesMut::with_capacity(0); + proof.block_encode(&mut buf); + + let d_proof = Proof::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(proof, d_proof); +} diff --git a/store/src/codec/mod.rs b/store/src/codec/mod.rs index 97901f273..05921491c 100644 --- a/store/src/codec/mod.rs +++ b/store/src/codec/mod.rs @@ -12,9 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Codecs for Blocks and Transactions + pub mod block; pub mod tx; -pub use self::block::{BlockCodec, BlockHasher}; -pub use self::tx::TxCodec; +#[cfg(test)] +mod block_test; +#[cfg(test)] +mod tx_test; +pub use self::block::{BlockCodec, BlockHasher }; +pub use self::tx::TxCodec; diff --git a/store/src/codec/tx.rs b/store/src/codec/tx.rs index 72bca4a1f..27d19f729 100644 --- a/store/src/codec/tx.rs +++ b/store/src/codec/tx.rs @@ -31,7 +31,8 @@ macro_rules! try_opt_dec { }); } -#[derive(Debug, Clone)] +/// Decodes and encodes `Transaction`s +#[derive(Debug, Clone, Default)] pub struct TxCodec; impl codec::Encoder for TxCodec { @@ -121,12 +122,12 @@ impl codec::Decoder for TxCodec { } /// Internal Convenience Trait -trait TxEncode: Sized { +pub trait TxEncode: Sized { fn tx_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; } /// Internal Convenience Trait -trait TxDecode: Sized { +pub trait TxDecode: Sized { fn tx_decode(src: &mut BytesMut) -> Result, io::Error>; } @@ -187,85 +188,4 @@ impl TxDecode for Input { Ok(Some(Input(Commitment(c)))) } -} - - - - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn should_have_tx_codec_roundtrip() { - use tokio_io::codec::{Encoder, Decoder}; - - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let tx = Transaction { - inputs: vec![input], - outputs: vec![output], - fee: 0, - excess_sig: vec![0; 10], - }; - - let mut buf = BytesMut::with_capacity(0); - let mut codec = TxCodec {}; - codec.encode(tx.clone(), &mut buf).expect("Error During Transaction Encoding"); - - let d_tx = codec.decode(&mut buf) - .expect("Error During Transaction Decoding") - .expect("Unfinished Transaction"); - - assert_eq!(tx.inputs[0].commitment(), d_tx.inputs[0].commitment()); - assert_eq!(tx.outputs[0].features, d_tx.outputs[0].features); - assert_eq!(tx.fee, d_tx.fee); - assert_eq!(tx.excess_sig, d_tx.excess_sig); - } - - #[test] - fn should_encode_and_decode_output() { - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let mut buf = BytesMut::with_capacity(0); - output.tx_encode(&mut buf); - - let d_output = Output::tx_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(output.features, d_output.features); - assert_eq!(output.proof().as_ref(), d_output.proof().as_ref()); - assert_eq!(output.commitment(), d_output.commitment()); - - } - - #[test] - fn should_encode_and_decode_input() { - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let mut buf = BytesMut::with_capacity(0); - input.tx_encode(&mut buf); - - assert_eq!([1; PEDERSEN_COMMITMENT_SIZE].as_ref(), buf); - assert_eq!(input.commitment(), - Input::tx_decode(&mut buf) - .unwrap() - .unwrap() - .commitment()); - } -} +} \ No newline at end of file diff --git a/store/src/codec/tx_test.rs b/store/src/codec/tx_test.rs new file mode 100644 index 000000000..da1d7a882 --- /dev/null +++ b/store/src/codec/tx_test.rs @@ -0,0 +1,93 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use tokio_io::*; +use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; + +use core::core::{Input, Output, Transaction}; +use core::core::transaction::OutputFeatures; + +use secp::pedersen::{RangeProof, Commitment}; +use secp::constants::PEDERSEN_COMMITMENT_SIZE; + +use super::tx::*; + +#[test] +fn should_have_tx_codec_roundtrip() { + use tokio_io::codec::{Encoder, Decoder}; + + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let tx = Transaction { + inputs: vec![input], + outputs: vec![output], + fee: 0, + excess_sig: vec![0; 10], + }; + + let mut buf = BytesMut::with_capacity(0); + let mut codec = TxCodec {}; + codec + .encode(tx.clone(), &mut buf) + .expect("Error During Transaction Encoding"); + + let d_tx = codec + .decode(&mut buf) + .expect("Error During Transaction Decoding") + .expect("Unfinished Transaction"); + + assert_eq!(tx, d_tx); +} + +#[test] +fn should_encode_and_decode_output() { + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let mut buf = BytesMut::with_capacity(0); + output.tx_encode(&mut buf); + + let d_output = Output::tx_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(output, d_output); +} + +#[test] +fn should_encode_and_decode_input() { + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let mut buf = BytesMut::with_capacity(0); + input.tx_encode(&mut buf); + + let d_input = Input::tx_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(input, d_input); +} diff --git a/store/src/lib.rs b/store/src/lib.rs index 75f20e10f..42831edfa 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -35,13 +35,16 @@ use std::fmt; use std::iter::Iterator; use std::marker::PhantomData; use std::sync::RwLock; +use tokio_io::codec::{Encoder,Decoder}; +use bytes::BytesMut; +use bytes::buf::{FromBuf, IntoBuf}; use byteorder::{WriteBytesExt, BigEndian}; use rocksdb::{DB, WriteBatch, DBCompactionStyle, DBIterator, IteratorMode, Direction}; use core::ser; -mod codec; +pub mod codec; use codec::{BlockCodec, BlockHasher, TxCodec}; /// Main error type for this crate. @@ -54,6 +57,8 @@ pub enum Error { RocksDbErr(String), /// Wraps a serialization error for Writeable or Readable SerErr(ser::Error), + /// Wraps an Io Error + Io(std::io::Error), } impl fmt::Display for Error { @@ -62,6 +67,7 @@ impl fmt::Display for Error { &Error::NotFoundErr => write!(f, "Not Found"), &Error::RocksDbErr(ref s) => write!(f, "RocksDb Error: {}", s), &Error::SerErr(ref e) => write!(f, "Serialization Error: {}", e.to_string()), + &Error::Io(ref e) => write!(f, "Codec Error: {}", e) } } } @@ -72,6 +78,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: std::io::Error) -> Error { + Error::Io(e) + } +} + /// Thread-safe rocksdb wrapper pub struct Store { rdb: RwLock, @@ -98,13 +110,25 @@ impl Store { db.put(key, &value[..]).map_err(&From::from) } - /// Writes a single key and its `Writeable` value to the db. Encapsulates - /// serialization. - pub fn put_ser(&self, key: &[u8], value: &W) -> Result<(), Error> { - let ser_value = ser::ser_vec(value); - match ser_value { - Ok(data) => self.put(key, data), - Err(err) => Err(Error::SerErr(err)), + /// Writes a single key and a value using a given encoder. + pub fn put_enc(&self, encoder: &mut E, key: &[u8], value: E::Item) -> Result<(), Error> + where Error: From { + + let mut data = BytesMut::with_capacity(0); + encoder.encode(value, &mut data)?; + self.put(key, data.to_vec()) + } + + /// Gets a value from the db, provided its key and corresponding decoder + pub fn get_dec(&self, decoder: &mut D, key: &[u8]) -> Result, Error> + where Error: From { + + let data = self.get(key)?; + if let Some(buf) = data { + let mut buf = BytesMut::from_buf(buf); + decoder.decode(&mut buf).map_err(From::from) + } else { + Ok(None) } } @@ -114,30 +138,6 @@ impl Store { db.get(key).map(|r| r.map(|o| o.to_vec())).map_err(From::from) } - /// Gets a `Readable` value from the db, provided its key. Encapsulates - /// serialization. - pub fn get_ser(&self, key: &[u8]) -> Result, Error> { - self.get_ser_limited(key, 0) - } - - /// Gets a `Readable` value from the db, provided its key, allowing to - /// extract only partial data. The underlying Readable size must align - /// accordingly. Encapsulates serialization. - pub fn get_ser_limited(&self, - key: &[u8], - len: usize) - -> Result, Error> { - let data = try!(self.get(key)); - match data { - Some(val) => { - let mut lval = if len > 0 { &val[..len] } else { &val[..] }; - let r = try!(ser::deserialize(&mut lval).map_err(Error::SerErr)); - Ok(Some(r)) - } - None => Ok(None), - } - } - /// Whether the provided key exists pub fn exists(&self, key: &[u8]) -> Result { let db = self.rdb.read().unwrap(); @@ -150,17 +150,15 @@ impl Store { db.delete(key).map_err(From::from) } - /// Produces an iterator of `Readable` types moving forward from the - /// provided - /// key. - pub fn iter(&self, from: &[u8]) -> SerIterator { + /// Produces an iterator of items decoded by a decoder moving forward from the provided key. + pub fn iter_dec(&self, codec: D, from: &[u8]) -> DecIterator { let db = self.rdb.read().unwrap(); - SerIterator { + DecIterator { iter: db.iterator(IteratorMode::From(from, Direction::Forward)), - _marker: PhantomData, + codec: codec } } - + /// Builds a new batch to be used with this store. pub fn batch(&self) -> Batch { Batch { @@ -182,17 +180,13 @@ pub struct Batch<'a> { } impl<'a> Batch<'a> { - /// Writes a single key and its `Writeable` value to the batch. The write - /// function must be called to "commit" the batch to storage. - pub fn put_ser(mut self, key: &[u8], value: &W) -> Result, Error> { - let ser_value = ser::ser_vec(value); - match ser_value { - Ok(data) => { - self.batch.put(key, &data[..])?; - Ok(self) - } - Err(err) => Err(Error::SerErr(err)), - } + + /// Using a given encoder, Writes a single key and a value to the batch. + pub fn put_enc(mut self, encoder: &mut E, key: &[u8], value: E::Item) -> Result, Error> where Error: From { + let mut data = BytesMut::with_capacity(0); + encoder.encode(value, &mut data)?; + self.batch.put(key, &data)?; + Ok(self) } /// Writes the batch to RocksDb. @@ -201,29 +195,24 @@ impl<'a> Batch<'a> { } } -/// An iterator thad produces Readable instances back. Wraps the lower level -/// DBIterator and deserializes the returned values. -pub struct SerIterator - where T: ser::Readable -{ +/// An iterator that produces items from a `DBIterator` instance with a given `Decoder`. +/// Iterates and decodes returned values +pub struct DecIterator where D: Decoder { iter: DBIterator, - _marker: PhantomData, + codec: D } -impl Iterator for SerIterator - where T: ser::Readable -{ - type Item = T; - - fn next(&mut self) -> Option { +impl Iterator for DecIterator where D: Decoder { + type Item = D::Item; + fn next(&mut self) -> Option { let next = self.iter.next(); - next.and_then(|r| { - let (_, v) = r; - ser::deserialize(&mut &v[..]).ok() - }) + next.and_then(|(_, v)| { + self.codec.decode(&mut BytesMut::from(v.as_ref())).ok() + }).unwrap_or(None) } } + /// Build a db key from a prefix and a byte vector identifier. pub fn to_key(prefix: u8, k: &mut Vec) -> Vec { let mut res = Vec::with_capacity(k.len() + 2);