diff --git a/chain/Cargo.toml b/chain/Cargo.toml index ea776c5b7..1458fdb33 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -11,9 +11,6 @@ log = "^0.3" serde = "~1.0.8" serde_derive = "~1.0.8" time = "^0.1" -tokio-io = "0.1.1" -bytes = "0.4.2" -num-bigint = "^0.1.35" grin_core = { path = "../core" } grin_store = { path = "../store" } diff --git a/chain/src/codec.rs b/chain/src/codec.rs deleted file mode 100644 index 07247ca9d..000000000 --- a/chain/src/codec.rs +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Implementation of the chain block encoding and decoding. - -use std::io; - -use tokio_io::*; -use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; -use num_bigint::BigUint; - -use types::Tip; -use core::core::hash::Hash; -use core::core::target::Difficulty; - -// Convenience Macro for Option Handling in Decoding -macro_rules! try_opt_dec { - ($e: expr) => (match $e { - Some(val) => val, - None => return Ok(None), - }); -} - -/// Codec for Decoding and Encoding a `Tip` -#[derive(Debug, Clone, Default)] -pub struct ChainCodec; - -impl codec::Encoder for ChainCodec { - type Item = Tip; - type Error = io::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - // Put Height - dst.reserve(8); - dst.put_u64::(item.height); - - // Put Last Block Hash - item.last_block_h.chain_encode(dst)?; - - // Put Previous Block Hash - item.prev_block_h.chain_encode(dst)?; - - // Put Difficulty - item.total_difficulty.chain_encode(dst)?; - - Ok(()) - } -} - -impl codec::Decoder for ChainCodec { - type Item = Tip; - type Error = io::Error; - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - - // Create Temporary Buffer - let ref mut temp = src.clone(); - - // Get Height - if temp.len() < 8 { - return Ok(None); - } - let mut buf = temp.split_to(8).into_buf(); - let height = buf.get_u64::(); - - // Get Last Block Hash - let last_block_h = try_opt_dec!(Hash::chain_decode(temp)?); - - // Get Previous Block Hash - let prev_block_h = try_opt_dec!(Hash::chain_decode(temp)?); - - // Get Difficulty - let total_difficulty = try_opt_dec!(Difficulty::chain_decode(temp)?); - - // If succesfull truncate src by bytes read from temp; - let diff = src.len() - temp.len(); - src.split_to(diff); - - Ok(Some(Tip { - height: height, - last_block_h: last_block_h, - prev_block_h: prev_block_h, - total_difficulty: total_difficulty, - })) - } -} - -/// Internal Convenience Trait -trait ChainEncode: Sized { - fn chain_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; -} - -/// Internal Convenience Trait -trait ChainDecode: Sized { - fn chain_decode(src: &mut BytesMut) -> Result, io::Error>; -} - -impl ChainEncode for Difficulty { - fn chain_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - let data = self.clone().into_biguint().to_bytes_be(); - dst.reserve(1 + data.len()); - dst.put_u8(data.len() as u8); - dst.put_slice(&data); - Ok(()) - } -} - -impl ChainDecode for Difficulty { - fn chain_decode(src: &mut BytesMut) -> Result, io::Error> { - if src.len() < 1 { - return Ok(None); - } - let mut buf = src.split_to(1).into_buf(); - let dlen = buf.get_u8() as usize; - - if src.len() < dlen { - return Ok(None); - } - - let buf = src.split_to(dlen).into_buf(); - let data = Buf::bytes(&buf); - - Ok(Some(Difficulty::from_biguint(BigUint::from_bytes_be(data)))) - } -} - -impl ChainEncode for Hash { - fn chain_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - dst.reserve(32); - dst.put_slice(self.as_ref()); - Ok(()) - } -} - -impl ChainDecode for Hash { - fn chain_decode(src: &mut BytesMut) -> Result, io::Error> { - if src.len() < 32 { - return Ok(None); - } - - let mut buf = src.split_to(32).into_buf(); - let mut hash_data = [0; 32]; - buf.copy_to_slice(&mut hash_data); - - Ok(Some(Hash(hash_data))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn should_have_chain_codec_roundtrip() { - use tokio_io::codec::{Encoder, Decoder}; - - let sample_gdb = Hash([1u8; 32]); - let tip = Tip::new(sample_gdb); - - let mut buf = BytesMut::with_capacity(0); - let mut codec = ChainCodec {}; - codec.encode(tip.clone(), &mut buf).expect("Error During Tip Encoding"); - - let d_tip = - codec.decode(&mut buf).expect("Error During Tip Decoding").expect("Unfinished Tip"); - - // Check if all bytes are read - assert_eq!(buf.len(), 0); - - assert_eq!(tip.height, d_tip.height); - assert_eq!(tip.last_block_h, d_tip.last_block_h); - assert_eq!(tip.prev_block_h, d_tip.prev_block_h); - assert_eq!(tip.total_difficulty, d_tip.total_difficulty); - } -} diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 15df331f2..03f966941 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -29,9 +29,6 @@ extern crate serde; #[macro_use] extern crate serde_derive; extern crate time; -extern crate tokio_io; -extern crate bytes; -extern crate num_bigint; extern crate grin_core as core; extern crate grin_store; @@ -40,7 +37,6 @@ extern crate secp256k1zkp as secp; pub mod pipe; pub mod store; pub mod types; -pub mod codec; // Re-export the base interface diff --git a/chain/src/store.rs b/chain/src/store.rs index e20d6bd63..71940940a 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -17,11 +17,9 @@ use secp::pedersen::Commitment; use types::*; -use codec::ChainCodec; use core::core::hash::{Hash, Hashed}; use core::core::{Block, BlockHeader, Output}; use grin_store::{self, Error, to_key, u64_to_key, option_to_not_found}; -use grin_store::codec::BlockCodec; const STORE_SUBPATH: &'static str = "chain"; @@ -41,49 +39,46 @@ pub struct ChainKVStore { impl ChainKVStore { pub fn new(root_path: String) -> Result { let db = grin_store::Store::open(format!("{}/{}", root_path, STORE_SUBPATH).as_str())?; - let codec = ChainCodec::default(); Ok(ChainKVStore { db: db }) } } impl ChainStore for ChainKVStore { fn head(&self) -> Result { - option_to_not_found(self.db.get_dec(&mut ChainCodec, &[HEAD_PREFIX])) + option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX])) } fn head_header(&self) -> Result { - let head: Tip = option_to_not_found(self.db.get_dec(&mut ChainCodec, &[HEAD_PREFIX]))?; + let head: Tip = try!(option_to_not_found(self.db.get_ser(&vec![HEAD_PREFIX]))); self.get_block_header(&head.last_block_h) } fn save_head(&self, t: &Tip) -> Result<(), Error> { self.db .batch() - .put_enc(&mut ChainCodec, &[HEAD_PREFIX], t.clone())? - .put_enc(&mut ChainCodec, &[HEADER_HEAD_PREFIX], t.clone())? + .put_ser(&vec![HEAD_PREFIX], t)? + .put_ser(&vec![HEADER_HEAD_PREFIX], t)? .write() } fn save_body_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_enc(&mut ChainCodec, &[HEAD_PREFIX], t.clone()) + self.db.put_ser(&vec![HEAD_PREFIX], t) } fn get_header_head(&self) -> Result { - option_to_not_found(self.db.get_dec(&mut ChainCodec, &[HEADER_HEAD_PREFIX])) + option_to_not_found(self.db.get_ser(&vec![HEADER_HEAD_PREFIX])) } fn save_header_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_enc(&mut ChainCodec, &[HEADER_HEAD_PREFIX], t.clone()) + self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t) } fn get_block(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), - &to_key(BLOCK_PREFIX, &mut h.to_vec()))) + option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec()))) } fn get_block_header(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), - &to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec()))) + option_to_not_found(self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec()))) } fn check_block_exists(&self, h: &Hash) -> Result { @@ -94,12 +89,9 @@ impl ChainStore for ChainKVStore { // saving the block and its header let mut batch = self.db .batch() - .put_enc(&mut BlockCodec::default(), - &to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..], - b.clone())? - .put_enc(&mut BlockCodec::default(), - &to_key(BLOCK_HEADER_PREFIX, &mut b.hash().to_vec())[..], - b.header.clone())?; + .put_ser(&to_key(BLOCK_PREFIX, &mut b.hash().to_vec())[..], b)? + .put_ser(&to_key(BLOCK_HEADER_PREFIX, &mut b.hash().to_vec())[..], + &b.header)?; // saving the full output under its hash, as well as a commitment to hash index for out in &b.outputs { @@ -113,14 +105,12 @@ impl ChainStore for ChainKVStore { } fn save_block_header(&self, bh: &BlockHeader) -> Result<(), Error> { - self.db.put_enc(&mut BlockCodec::default(), - &to_key(BLOCK_HEADER_PREFIX, &mut bh.hash().to_vec())[..], - bh.clone()) + self.db.put_ser(&to_key(BLOCK_HEADER_PREFIX, &mut bh.hash().to_vec())[..], + bh) } fn get_header_by_height(&self, height: u64) -> Result { - option_to_not_found(self.db.get_dec(&mut BlockCodec::default(), - &u64_to_key(HEADER_HEIGHT_PREFIX, height))) + option_to_not_found(self.db.get_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, height))) } fn get_output_by_commit(&self, commit: &Commitment) -> Result { @@ -131,15 +121,11 @@ impl ChainStore for ChainKVStore { fn has_output_commit(&self, commit: &Commitment) -> Result { option_to_not_found(self.db - .get_dec(&mut BlockCodec::default(), - &to_key(OUTPUT_COMMIT_PREFIX, &mut commit.as_ref().to_vec()))) + .get_ser(&to_key(OUTPUT_COMMIT_PREFIX, &mut commit.as_ref().to_vec()))) } fn setup_height(&self, bh: &BlockHeader) -> Result<(), Error> { - self.db - .put_enc(&mut BlockCodec::default(), - &u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), - bh.clone())?; + self.db.put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), bh)?; let mut prev_h = bh.previous; let mut prev_height = bh.height - 1; @@ -148,9 +134,8 @@ impl ChainStore for ChainKVStore { if prev.hash() != prev_h { let real_prev = self.get_block_header(&prev_h)?; self.db - .put_enc(&mut BlockCodec::default(), - &u64_to_key(HEADER_HEIGHT_PREFIX, real_prev.height), - real_prev.clone())?; + .put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, real_prev.height), + &real_prev); prev_h = real_prev.previous; prev_height = real_prev.height - 1; } else { diff --git a/chain/src/types.rs b/chain/src/types.rs index 28669ca12..306b3f27a 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -20,6 +20,7 @@ use grin_store::Error; use core::core::{Block, BlockHeader, Output}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; +use core::ser; /// The tip of a fork. A handle to the fork ancestry from its leaf in the /// blockchain tree. References the max height and the latest and previous @@ -59,6 +60,31 @@ impl Tip { } } +/// Serialization of a tip, required to save to datastore. +impl ser::Writeable for Tip { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + try!(writer.write_u64(self.height)); + try!(writer.write_fixed_bytes(&self.last_block_h)); + try!(writer.write_fixed_bytes(&self.prev_block_h)); + self.total_difficulty.write(writer) + } +} + +impl ser::Readable for Tip { + fn read(reader: &mut ser::Reader) -> Result { + let height = try!(reader.read_u64()); + let last = try!(Hash::read(reader)); + let prev = try!(Hash::read(reader)); + let diff = try!(Difficulty::read(reader)); + Ok(Tip { + height: height, + last_block_h: last, + prev_block_h: prev, + total_difficulty: diff, + }) + } +} + /// Trait the chain pipeline requires an implementor for in order to process /// blocks. pub trait ChainStore: Send + Sync { diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 6b1498525..1f3a581c0 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -36,7 +36,7 @@ bitflags! { } /// Block header, fairly standard compared to other blockchains. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct BlockHeader { /// Height of this block since the genesis block (height 0) pub height: u64, @@ -139,7 +139,7 @@ impl Readable for BlockHeader { /// non-explicit, assumed to be deducible from block height (similar to /// bitcoin's schedule) and expressed as a global transaction fee (added v.H), /// additive to the total of fees ever collected. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct Block { /// The header with metadata and commitments to the rest of the data pub header: BlockHeader, diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 67941795e..7ac1094ba 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -37,7 +37,7 @@ bitflags! { /// Pedersen commitment and the signature, that guarantees that the commitments /// amount to zero. The signature signs the fee, which is retained for /// signature validation. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct TxKernel { /// Options for a kernel's structure or use pub features: KernelFeatures, @@ -88,7 +88,7 @@ impl TxKernel { } /// A transaction -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct Transaction { /// Set of inputs spent by the transaction. pub inputs: Vec, @@ -239,7 +239,7 @@ impl Transaction { /// A transaction input, mostly a reference to an output being spent by the /// transaction. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone)] pub struct Input(pub Commitment); /// Implementation of Writeable for a transaction Input, defines how to write diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index b2f485f2b..c2e1afc8a 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -5,8 +5,6 @@ authors = ["Ignotus Peverell "] workspace = ".." [dependencies] -tokio-io = "0.1.1" -bytes = "0.4.2" bitflags = "^0.7.0" byteorder = "^0.5" futures = "^0.1.9" @@ -17,7 +15,6 @@ serde = "~1.0.8" serde_derive = "~1.0.8" tokio-core="^0.1.1" tokio-timer="^0.1.0" - time = "^0.1" enum_primitive = "^0.1.0" num = "^0.1.36" @@ -25,7 +22,7 @@ num = "^0.1.36" grin_core = { path = "../core" } grin_store = { path = "../store" } grin_util = { path = "../util" } -secp256k1zkp = { path = "../secp256k1zkp" } [dev-dependencies] env_logger = "^0.3" +secp256k1zkp = { path = "../secp256k1zkp" } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 5953b5422..f68d4e968 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -34,16 +34,12 @@ extern crate futures; #[macro_use] extern crate tokio_core; extern crate tokio_timer; -extern crate tokio_io; -extern crate bytes; extern crate rand; extern crate serde; #[macro_use] extern crate serde_derive; extern crate time; extern crate num; -extern crate secp256k1zkp as secp; - mod conn; pub mod handshake; @@ -53,8 +49,6 @@ mod protocol; mod server; mod store; mod types; -mod msg_codec; -mod peer_codec; pub use server::{Server, DummyAdapter}; pub use peer::Peer; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 8240a16bc..b053fe41d 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -36,7 +36,7 @@ pub const PROTOCOL_VERSION: u32 = 1; pub const USER_AGENT: &'static str = "MW/Grin 0.1"; /// Magic number expected in the header of every message -pub const MAGIC: [u8; 2] = [0x1e, 0xc5]; +const MAGIC: [u8; 2] = [0x1e, 0xc5]; /// Size in bytes of a message header pub const HEADER_LEN: u64 = 11; @@ -123,7 +123,7 @@ pub fn write_msg(conn: TcpStream, /// Header of any protocol message, used to identify incoming messages. pub struct MsgHeader { - pub magic: [u8; 2], + magic: [u8; 2], /// Type of the message. pub msg_type: Type, /// Tota length of the message in bytes. @@ -177,7 +177,6 @@ impl Readable for MsgHeader { /// First part of a handshake, sender advertises its version and /// characteristics. -#[derive(Clone, Debug, PartialEq)] pub struct Hand { /// protocol version of the sender pub version: u32, @@ -233,7 +232,6 @@ impl Readable for Hand { /// Second part of a handshake, receiver of the first part replies with its own /// version and characteristics. -#[derive(Clone, Debug, PartialEq)] pub struct Shake { /// sender version pub version: u32, @@ -275,7 +273,6 @@ impl Readable for Shake { } /// Ask for other peers addresses, required for network discovery. -#[derive(Clone, Debug, PartialEq)] pub struct GetPeerAddrs { /// Filters on the capabilities we'd like the peers to have pub capabilities: Capabilities, @@ -297,7 +294,6 @@ impl Readable for GetPeerAddrs { /// Peer addresses we know of that are fresh enough, in response to /// GetPeerAddrs. -#[derive(Clone, Debug, PartialEq)] pub struct PeerAddrs { pub peers: Vec, } @@ -331,7 +327,6 @@ impl Readable for PeerAddrs { /// We found some issue in the communication, sending an error back, usually /// followed by closing the connection. -#[derive(Clone, Debug, PartialEq)] pub struct PeerError { /// error code pub code: u32, @@ -360,7 +355,6 @@ impl Readable for PeerError { /// Only necessary so we can implement Readable and Writeable. Rust disallows /// implementing traits when both types are outside of this crate (which is the /// case for SocketAddr and Readable/Writeable). -#[derive(Clone, Debug, PartialEq)] pub struct SockAddr(pub SocketAddr); impl Writeable for SockAddr { @@ -414,7 +408,6 @@ impl Readable for SockAddr { } /// Serializable wrapper for the block locator. -#[derive(Clone, Debug, PartialEq)] pub struct Locator { pub hashes: Vec, } @@ -441,7 +434,6 @@ impl Readable for Locator { } /// Serializable wrapper for a list of block headers. -#[derive(Clone, Debug, PartialEq)] pub struct Headers { pub headers: Vec, } diff --git a/p2p/src/msg_codec.rs b/p2p/src/msg_codec.rs deleted file mode 100644 index ff7feb287..000000000 --- a/p2p/src/msg_codec.rs +++ /dev/null @@ -1,952 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Implementation of the p2p message encoding and decoding. - -use std::io; -use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr, IpAddr}; - -use tokio_io::*; -use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; -use tokio_io::codec::{Encoder, Decoder}; -use enum_primitive::FromPrimitive; - -use core::core::{Block, BlockHeader, Input, Output, Transaction, TxKernel}; -use core::core::hash::Hash; -use core::core::target::Difficulty; -use core::core::transaction::{OutputFeatures, KernelFeatures}; -use types::*; - -use secp::pedersen::{RangeProof, Commitment}; -use secp::constants::PEDERSEN_COMMITMENT_SIZE; - -use grin_store::codec::{BlockCodec, TxCodec}; - -use msg::*; -use msg::MsgHeader; - -const MSG_HEADER_SIZE: usize = 11; -const SOCKET_ADDR_MARKER_V4: u8 = 0; -const SOCKET_ADDR_MARKER_V6: u8 = 1; - -// Convenience Macro for Option Handling in Decoding -macro_rules! try_opt_dec { - ($e: expr) => (match $e { - Some(val) => val, - None => return Ok(None), - }); -} - -#[derive(Clone, Debug, PartialEq)] -enum Message { - PeerError(PeerError), - Hand(Hand), - Shake(Shake), - Ping, - Pong, - GetPeerAddrs(GetPeerAddrs), - PeerAddrs(PeerAddrs), - GetHeaders(Locator), - Headers(Headers), - GetBlock(Hash), - Block(Block), - Transaction(Transaction), -} - -/// Codec for Decoding and Encoding a `MsgHeader` -#[derive(Debug, Clone, Default)] -struct MsgCodec; - -impl codec::Encoder for MsgCodec { - type Item = Message; - type Error = io::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.reserve(MSG_HEADER_SIZE); - - let mut msg_dst = BytesMut::with_capacity(0); - - let header = match item { - Message::Pong => MsgHeader::new(Type::Pong, 0), - Message::Ping => MsgHeader::new(Type::Ping, 0), - Message::Hand(hand) => { - hand.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::Hand, msg_dst.len() as u64) - } - Message::Shake(shake) => { - shake.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::Shake, msg_dst.len() as u64) - } - Message::GetPeerAddrs(get_peer_addrs) => { - get_peer_addrs.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::GetPeerAddrs, msg_dst.len() as u64) - } - Message::PeerAddrs(peer_addrs) => { - peer_addrs.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::PeerAddrs, msg_dst.len() as u64) - } - Message::Headers(headers) => { - headers.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::Headers, msg_dst.len() as u64) - } - Message::GetHeaders(locator) => { - locator.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::GetHeaders, msg_dst.len() as u64) - } - Message::Block(block) => { - let mut codec = BlockCodec::default(); - codec.encode(block, &mut msg_dst)?; - MsgHeader::new(Type::Block, msg_dst.len() as u64) - } - Message::GetBlock(hash) => { - let mut codec = BlockCodec::default(); - codec.encode(hash, &mut msg_dst)?; - MsgHeader::new(Type::GetBlock, msg_dst.len() as u64) - } - Message::Transaction(tx) => { - let mut codec = TxCodec::default(); - codec.encode(tx, &mut msg_dst)?; - MsgHeader::new(Type::Transaction, msg_dst.len() as u64) - } - Message::PeerError(err) => { - err.msg_encode(&mut msg_dst)?; - MsgHeader::new(Type::Error, msg_dst.len() as u64) - } - }; - - // Put MsgHeader - dst.put_slice(&header.magic); - dst.put_u8(header.msg_type as u8); - dst.put_u64::(header.msg_len); - - // Put Body - dst.reserve(msg_dst.len()); - dst.put_slice(&msg_dst); - - Ok(()) - - } -} - -impl codec::Decoder for MsgCodec { - type Item = Message; - type Error = io::Error; - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // Create Temporary Buffer - let ref mut temp_src = src.clone(); - - // Decode Header - if temp_src.len() < MSG_HEADER_SIZE { - return Ok(None); - } - let mut buf = temp_src.split_to(MSG_HEADER_SIZE).into_buf(); - - // Get Magic - let mut some_magic = [0; 2]; - buf.copy_to_slice(&mut some_magic); - - // If Magic is invalid return error. - if some_magic != MAGIC { - return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Header")); - } - - // Ensure Valid Message Type - let msg_type = match Type::from_u8(buf.get_u8()) { - Some(t) => t, - None => { - return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Message Type")); - } - }; - - // Ensure sufficient data - let msg_len = buf.get_u64::() as usize; - if temp_src.len() < msg_len { - return Ok(None); - } - - // Attempt message body decode - let decoded_msg = match msg_type { - Type::Ping => Message::Ping, - Type::Pong => Message::Pong, - Type::Hand => { - let hand = try_opt_dec!(Hand::msg_decode(temp_src)?); - Message::Hand(hand) - } - Type::Shake => { - let shake = try_opt_dec!(Shake::msg_decode(temp_src)?); - Message::Shake(shake) - } - Type::GetPeerAddrs => { - let get_peer_addrs = try_opt_dec!(GetPeerAddrs::msg_decode(temp_src)?); - Message::GetPeerAddrs(get_peer_addrs) - } - Type::PeerAddrs => { - let peer_addrs = try_opt_dec!(PeerAddrs::msg_decode(temp_src)?); - Message::PeerAddrs(peer_addrs) - } - Type::Headers => { - let headers = try_opt_dec!(Headers::msg_decode(temp_src)?); - Message::Headers(headers) - } - Type::GetHeaders => { - let locator = try_opt_dec!(Locator::msg_decode(temp_src)?); - Message::GetHeaders(locator) - } - Type::Block => { - let mut codec = BlockCodec::default(); - let block = try_opt_dec!(codec.decode(temp_src)?); - Message::Block(block) - } - Type::GetBlock => { - let mut codec = BlockCodec::default(); - let hash = try_opt_dec!(codec.decode(temp_src)?); - Message::GetBlock(hash) - } - Type::Transaction => { - let mut codec = TxCodec::default(); - let transaction = try_opt_dec!(codec.decode(temp_src)?); - Message::Transaction(transaction) - } - Type::Error => { - let err = try_opt_dec!(PeerError::msg_decode(temp_src)?); - Message::PeerError(err) - } - }; - - // If succesfull truncate src by bytes read from temp_src; - let diff = src.len() - temp_src.len(); - src.split_to(diff); - - Ok(Some(decoded_msg)) - } -} - -// Internal Convenience Traits -pub trait MsgEncode: Sized { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; -} - -// Internal Convenience Trait -pub trait MsgDecode: Sized { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error>; -} - -impl MsgEncode for Hand { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - // Reserve for version(4), capabilities(4), nonce(8) - dst.reserve(16); - // Put Protocol Version - dst.put_u32::(self.version); - // Put Capabilities - dst.put_u32::(self.capabilities.bits()); - // Put Nonce - dst.put_u64::(self.nonce); - - // Put Difficulty with BlockCodec - BlockCodec::default() - .encode(self.total_difficulty.clone(), dst)?; - - // Put Sender Address - self.sender_addr.0.msg_encode(dst)?; - // Put Receier Address - self.receiver_addr.0.msg_encode(dst)?; - - // Put Size of String - let str_bytes = self.user_agent.as_bytes(); - dst.reserve(str_bytes.len() + 1); - - // Put Software Version - dst.put_u8(str_bytes.len() as u8); - dst.put_slice(str_bytes); - - Ok(()) - } -} - -impl MsgDecode for Hand { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { - // TODO: Check for Full Hand Size Upfront - if src.len() < 16 { - return Ok(None); - } - // Get Protocol Version, Capabilities, Nonce - let mut buf = src.split_to(16).into_buf(); - let version = buf.get_u32::(); - let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); - let nonce = buf.get_u64::(); - - // Get Total Difficulty - let total_difficulty = try_opt_dec!(BlockCodec::default().decode(src)?); - - // Get Sender and Receiver Addresses - let sender_addr = try_opt_dec!(SocketAddr::msg_decode(src)?); - let receiver_addr = try_opt_dec!(SocketAddr::msg_decode(src)?); - - - // Get Software Version - // TODO: Decide on Hand#user_agent size - if src.len() < 1 { - return Ok(None); - } - let mut buf = src.split_to(1).into_buf(); - let str_len = buf.get_u8() as usize; - if src.len() < str_len { - return Ok(None); - } - let buf = src.split_to(str_len).into_buf(); - let user_agent = String::from_utf8(buf.collect()) - .map_err(|_| { - io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version") - })?; - - Ok(Some(Hand { - version: version, - capabilities: capabilities, - nonce: nonce, - total_difficulty: total_difficulty, - sender_addr: SockAddr(sender_addr), - receiver_addr: SockAddr(receiver_addr), - user_agent: user_agent, - })) - - } -} - -impl MsgEncode for Shake { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - // Reserve for version(4), capabilities(4) - dst.reserve(8); - // Put Protocol Version - dst.put_u32::(self.version); - // Put Capabilities - dst.put_u32::(self.capabilities.bits()); - - // Put Difficulty with BlockCodec - BlockCodec::default() - .encode(self.total_difficulty.clone(), dst)?; - - // Reserve for user agent string Size of String - let str_bytes = self.user_agent.as_bytes(); - dst.reserve(str_bytes.len() + 1); - - // Put user agent string - dst.put_u8(str_bytes.len() as u8); - dst.put_slice(str_bytes); - - Ok(()) - } -} - -impl MsgDecode for Shake { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { - // TODO: Check for Full Hand Size Upfront - if src.len() < 8 { - return Ok(None); - } - // Get Protocol Version, Capabilities, Nonce - let mut buf = src.split_to(8).into_buf(); - let version = buf.get_u32::(); - let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); - - // Get Total Difficulty - let total_difficulty = try_opt_dec!(BlockCodec::default().decode(src)?); - - // Get Software Version - // TODO: Decide on Hand#user_agent size - if src.len() < 1 { - return Ok(None); - } - let mut buf = src.split_to(1).into_buf(); - let str_len = buf.get_u8() as usize; - if src.len() < str_len { - return Ok(None); - } - let buf = src.split_to(str_len).into_buf(); - let user_agent = String::from_utf8(buf.collect()) - .map_err(|_| { - io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version") - })?; - - Ok(Some(Shake { - version: version, - capabilities: capabilities, - total_difficulty: total_difficulty, - user_agent: user_agent, - })) - - } -} - -impl MsgEncode for GetPeerAddrs { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - // Reserve for and put Capabilities - dst.reserve(4); - dst.put_u32::(self.capabilities.bits()); - Ok(()) - } -} - -impl MsgDecode for GetPeerAddrs { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { - if src.len() < 4 { - return Ok(None); - } - let mut buf = src.split_to(4).into_buf(); - let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); - Ok(Some(GetPeerAddrs { capabilities: capabilities })) - } -} - -impl MsgEncode for PeerAddrs { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - dst.reserve(4); - dst.put_u32::(self.peers.len() as u32); - for p in &self.peers { - p.0.msg_encode(dst)?; - } - Ok(()) - } -} - -impl MsgDecode for PeerAddrs { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { - if src.len() < 4 { - return Ok(None); - } - let mut buf = src.split_to(4).into_buf(); - let peer_count = buf.get_u32::(); - // Check peer count is valid, return error or empty if applicable - if peer_count > MAX_PEER_ADDRS { - return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Count")); - } else if peer_count == 0 { - return Ok(Some(PeerAddrs { peers: vec![] })); - } - - let mut peers = Vec::with_capacity(peer_count as usize); - for _ in 0..peer_count { - let p = SocketAddr::msg_decode(src)?; - // XXX: Do not need SockAddr wrapper?? - peers.push(SockAddr(p.unwrap())); - } - - Ok(Some(PeerAddrs { peers: peers })) - } -} - -impl MsgEncode for Headers { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - dst.reserve(2); - dst.put_u16::(self.headers.len() as u16); - let mut block_codec = BlockCodec::default(); - for h in &self.headers { - block_codec.encode(h.clone(), dst)?; - } - - Ok(()) - } -} - -impl MsgDecode for Headers { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { - if src.len() < 2 { - return Ok(None); - } - // Get Headers Length - let mut buf = src.split_to(2).into_buf(); - let len = buf.get_u16::(); - - // Collect Headers - let mut headers = Vec::with_capacity(len as usize); - let mut block_codec = BlockCodec::default(); - for _ in 0..len { - let block = try_opt_dec!(block_codec.decode(src)?); - headers.push(block); - } - Ok(Some(Headers { headers: headers })) - } -} - -impl MsgEncode for Locator { - fn msg_encode(&self, dst: &mut BytesMut) -> Result< (), io::Error > { - dst.reserve(1); - let len = self.hashes.len() as u8; - dst.put_u8(len); - - let mut block_codec = BlockCodec::default(); - for h in &self.hashes { - block_codec.encode(h.clone(), dst)?; - } - Ok(()) - } -} - -impl MsgDecode for Locator { - fn msg_decode(src: &mut BytesMut) -> Result< Option, io::Error > { - if src.len() < 1 { - return Ok(None); - } - let mut buf = src.split_to(1).into_buf(); - let len = buf.get_u8(); - - let mut hashes = Vec::with_capacity(len as usize); - let mut block_codec = BlockCodec::default(); - for _ in 0..len { - let hash = try_opt_dec!(block_codec.decode(src)?); - hashes.push(hash); - } - - Ok(Some(Locator { hashes: hashes })) - } -} - -impl MsgEncode for PeerError { - fn msg_encode(&self, dst: &mut BytesMut) -> Result< (), io::Error > { - dst.reserve(4); - dst.put_u32::(self.code); - - let bytes = self.message.as_bytes(); - dst.reserve(1 + bytes.len()); - dst.put_u8(bytes.len() as u8); - dst.put_slice(bytes); - Ok(()) - } -} - - -impl MsgDecode for PeerError { - fn msg_decode(src: &mut BytesMut) -> Result< Option, io::Error > { - // Reserve for code(4) and msg length(1) - if src.len() < 5 { - return Ok(None); - } - let mut buf = src.split_to(5).into_buf(); - let code = buf.get_u32::(); - - let str_len = buf.get_u8() as usize; - - if src.len() < str_len { - return Ok(None); - } - - let buf = src.split_to(str_len).into_buf(); - - let message = String::from_utf8(buf.collect()) - .map_err(|_| { - io::Error::new(io::ErrorKind::InvalidData, "Invalid Error Message Field") - })?; - - Ok(Some( PeerError { - code: code, - message: message - })) - } -} - -impl MsgEncode for SocketAddr { - fn msg_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { - match *self { - SocketAddr::V4(sav4) => { - dst.reserve(7); - dst.put_u8(SOCKET_ADDR_MARKER_V4); - dst.put_slice(&sav4.ip().octets()); - dst.put_u16::(sav4.port()); - Ok(()) - } - SocketAddr::V6(sav6) => { - dst.reserve(19); - dst.put_u8(SOCKET_ADDR_MARKER_V6); - - for seg in &sav6.ip().segments() { - dst.put_u16::(*seg); - } - - dst.put_u16::(sav6.port()); - Ok(()) - } - } - } -} - -impl MsgDecode for SocketAddr { - fn msg_decode(src: &mut BytesMut) -> Result, io::Error> { - if src.len() < 7 { - return Ok(None); - } - - let marker = src.split_to(1)[0]; - match marker { - SOCKET_ADDR_MARKER_V4 => { - let mut buf = src.split_to(6).into_buf(); - - // Get V4 address - let mut ip = [0; 4]; - buf.copy_to_slice(&mut ip); - - // Get port - let port = buf.get_u16::(); - - // Build v4 socket - let socket = SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port); - Ok(Some(SocketAddr::V4(socket))) - } - SOCKET_ADDR_MARKER_V6 => { - if src.len() < 18 { - return Ok(None); - } - let mut buf = src.split_to(18).into_buf(); - - // Get V6 address - let mut ip = [0u16; 8]; - for i in 0..8 { - ip[i] = buf.get_u16::(); - } - - // Get Port - let port = buf.get_u16::(); - - // Build V6 socket - let socket = SocketAddrV6::new(Ipv6Addr::new(ip[0], - ip[1], - ip[2], - ip[3], - ip[4], - ip[5], - ip[6], - ip[7]), - port, - 0, - 0); - - Ok(Some(SocketAddr::V6(socket))) - } - _ => Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid Socket Marker")), - } - } -} - - - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn should_encode_decode_ping() { - let mut codec = MsgCodec; - let ping = Message::Ping; - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(ping.clone(), &mut buf) - .expect("Expected to encode ping message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode ping message") - .unwrap(); - - - assert_eq!(ping, result); - } - - fn should_handle_incomplete_without_modifying_src_ping() { - let mut codec = MsgCodec; - let ping = Message::Ping; - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(ping.clone(), &mut buf) - .expect("Expected to encode ping message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode ping message") - .unwrap(); - assert_eq!(ping, result); - } - - - #[test] - fn should_encode_decode_pong() { - let mut codec = MsgCodec; - let pong = Message::Pong; - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(pong.clone(), &mut buf) - .expect("Expected to encode pong message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode pong message") - .unwrap(); - assert_eq!(pong, result); - } - - #[test] - fn should_encode_decode_hand() { - let mut codec = MsgCodec; - let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 8080)); - let hand = Message::Hand(Hand { - version: 0, - capabilities: UNKNOWN, - nonce: 0, - total_difficulty: Difficulty::one(), - sender_addr: sample_socket_addr.clone(), - receiver_addr: sample_socket_addr.clone(), - user_agent: "test".to_string(), - }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(hand.clone(), &mut buf) - .expect("Expected to encode hand message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode hand message") - .expect("Expected a full hand message"); - - assert_eq!(hand, result); - } - - #[test] - fn should_encode_decode_shake() { - let mut codec = MsgCodec; - let shake = Message::Shake(Shake { - version: 0, - capabilities: UNKNOWN, - total_difficulty: Difficulty::one(), - user_agent: "test".to_string(), - }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(shake.clone(), &mut buf) - .expect("Expected to encode shake message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode shake message") - .expect("Expected full shake message."); - - assert_eq!(shake, result); - } - - #[test] - fn should_encode_decode_get_peer_addrs() { - let mut codec = MsgCodec; - let get_peer_addrs = Message::GetPeerAddrs(GetPeerAddrs { capabilities: UNKNOWN }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(get_peer_addrs.clone(), &mut buf) - .expect("Expected to encode get peer addrs message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode get peer addrs message") - .expect("Expected full get peer addrs message."); - - assert_eq!(get_peer_addrs, result); - } - - #[test] - fn should_encode_decode_peer_addrs() { - let mut codec = MsgCodec; - let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 8000)); - let peer_addrs = Message::PeerAddrs(PeerAddrs { peers: vec![sample_socket_addr] }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(peer_addrs.clone(), &mut buf) - .expect("Expected to encode peer addrs message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode peer addrs message") - .expect("Expected full peer addrs message"); - - assert_eq!(peer_addrs, result); - } - - #[test] - fn should_encode_decode_headers() { - let mut codec = MsgCodec; - let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 8000)); - - let headers = Message::Headers(Headers { headers: vec![BlockHeader::default()] }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(headers.clone(), &mut buf) - .expect("Expected to encode headers message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode headers message") - .expect("Expected full headers message"); - - assert_eq!(headers, result); - } - - #[test] - fn should_encode_decode_get_headers() { - let mut codec = MsgCodec; - let sample_socket_addr = SockAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - 8000)); - - let get_headers = Message::GetHeaders(Locator { hashes: vec![Hash([1; 32])] }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(get_headers.clone(), &mut buf) - .expect("Expected to encode get headers msg"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode get headers msg") - .unwrap(); - - assert_eq!(get_headers, result); - } - - #[test] - fn should_encode_decode_get_block() { - let mut codec = MsgCodec; - - let get_block = Message::GetBlock(Hash([1; 32])); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(get_block.clone(), &mut buf) - .expect("Expected to encode hand"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode hand") - .unwrap(); - - assert_eq!(get_block, result); - } - - #[test] - fn should_encode_decode_block() { - let mut codec = MsgCodec; - - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let kernel = TxKernel { - features: KernelFeatures::empty(), - excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - excess_sig: vec![1; 10], - fee: 100, - }; - - let new_block = Block { - header: BlockHeader::default(), - inputs: vec![input], - outputs: vec![output], - kernels: vec![kernel], - }; - - let block = Message::Block(new_block); - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(block.clone(), &mut buf) - .expect("Expected to encode"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode") - .unwrap(); - - assert_eq!(block, result); - } - - #[test] - fn should_encode_decode_transaction() { - let mut codec = MsgCodec; - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let transaction = Message::Transaction(Transaction { - inputs: vec![input], - outputs: vec![output], - fee: 1 as u64, - excess_sig: vec![0; 10], - }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(transaction.clone(), &mut buf) - .expect("Expected to encode transaction message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode transaction message") - .unwrap(); - - assert_eq!(transaction, result); - } - - #[test] - fn should_encode_decode_error() { - let mut codec = MsgCodec; - - let error = Message::PeerError(PeerError { - code: 0, - message: "Uhoh".to_owned(), - }); - - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(error.clone(), &mut buf) - .expect("Expected to encode error message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode error message") - .unwrap(); - - assert_eq!(error, result); - } -} diff --git a/p2p/src/peer_codec.rs b/p2p/src/peer_codec.rs deleted file mode 100644 index 221487e27..000000000 --- a/p2p/src/peer_codec.rs +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Implementation of the peer data encoding and decoding - -use std::io; -use std::net::{SocketAddr, Ipv4Addr, Ipv6Addr, IpAddr}; - -use tokio_io::*; -use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; -use tokio_io::codec::{Encoder, Decoder}; -use enum_primitive::FromPrimitive; - -use types::*; -use msg_codec::{MsgDecode, MsgEncode}; -use store::{State, PeerData}; - -// Convenience Macro for Option Handling in Decoding -macro_rules! try_opt_dec { - ($e: expr) => (match $e { - Some(val) => val, - None => return Ok(None), - }); -} - -/// Codec for Decoding and Encoding a `PeerData` -#[derive(Debug, Clone, Default)] -pub struct PeerCodec; - -impl codec::Encoder for PeerCodec { - type Item = PeerData; - type Error = io::Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - // Put socket address as u32 - MsgEncode::msg_encode(&item.addr, dst)?; - - // Put capabilities - dst.reserve(4); - dst.put_u32::(item.capabilities.bits()); - - // Put user agent string with u8 length first - let str_bytes = item.user_agent.as_bytes(); - dst.reserve(str_bytes.len() + 1); - dst.put_u8(str_bytes.len() as u8); - dst.put_slice(str_bytes); - - // Put flags - dst.reserve(1); - dst.put_u8(item.flags as u8); - - Ok(()) - } -} - -impl codec::Decoder for PeerCodec { - type Item = PeerData; - type Error = io::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // Create Temporary Buffer - let ref mut temp_src = src.clone(); - - // Get socket address - let addr = try_opt_dec!(SocketAddr::msg_decode(temp_src)?); - - // Check for capabilites flags(4), user agent header(1) - if temp_src.len() < 5 { - return Ok(None); - } - - // Get capabilites - let mut buf = temp_src.split_to(5).into_buf(); - let capabilities = Capabilities::from_bits(buf.get_u32::()).unwrap_or(UNKNOWN); - - // Check for user agent length(str_len) and flags(1) - let str_len = buf.get_u8() as usize; - if temp_src.len() < str_len + 1 { - return Ok(None); - } - - // Get User Agent - let buf = temp_src.split_to(str_len).into_buf(); - let user_agent = String::from_utf8(buf.collect()) - .map_err(|_| { - io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version") - })?; - - // Get flags - let mut buf = temp_src.split_to(1).into_buf(); - let flags_data = buf.get_u8(); - let flags = State::from_u8(flags_data) - .ok_or(io::Error::new(io::ErrorKind::InvalidData, "Invalid Hand Software Version"))?; - - // If succesfull truncate src by bytes read from temp_src; - let diff = src.len() - temp_src.len(); - src.split_to(diff); - - Ok(Some(PeerData { - addr: addr, - capabilities: capabilities, - user_agent: user_agent, - flags: flags, - })) - } -} - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn should_encode_decode_peer_data() { - let mut codec = PeerCodec; - let peer_data = PeerData { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000), - capabilities: UNKNOWN, - user_agent: "foo".to_string(), - flags: State::Healthy, - }; - let mut buf = BytesMut::with_capacity(0); - - codec - .encode(peer_data.clone(), &mut buf) - .expect("Expected to encode peer data message"); - - let result = codec - .decode(&mut buf) - .expect("Expected no Errors to decode peer data message") - .unwrap(); - - assert_eq!(peer_data, result); - } -} \ No newline at end of file diff --git a/p2p/src/proto.rs b/p2p/src/proto.rs index fbfdcb709..cadd61faa 100644 --- a/p2p/src/proto.rs +++ b/p2p/src/proto.rs @@ -44,8 +44,7 @@ impl Codec for GrinCodec { fn encode(&mut self, msg: Self::In, mut buf: &mut Vec) -> io::Result<()> { match msg { Frame::Message{id, message, ..} => { - ser::serialize(&mut buf, &message) - .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Encoding error: {:?}", e)))?; + ser::serialize(&mut buf, &message).map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Encoding error: {:?}", e)))?; }, Frame::Body{id, chunk} => { if let Some(chunk) = chunk { diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 34b2883ee..59a7b03f0 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -22,10 +22,8 @@ use grin_store::{self, Error, to_key, option_to_not_found}; use msg::SockAddr; use types::Capabilities; -use tokio_io::codec::{Encoder, Decoder}; -use peer_codec::PeerCodec; - const STORE_SUBPATH: &'static str = "peers"; + const PEER_PREFIX: u8 = 'p' as u8; /// Types of messages @@ -39,7 +37,7 @@ enum_from_primitive! { } /// Data stored for any given peer we've encountered. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug)] pub struct PeerData { /// Network address of the peer. pub addr: SocketAddr, @@ -52,6 +50,37 @@ pub struct PeerData { pub flags: State, } +impl Writeable for PeerData { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + SockAddr(self.addr).write(writer)?; + ser_multiwrite!(writer, + [write_u32, self.capabilities.bits()], + [write_bytes, &self.user_agent], + [write_u8, self.flags as u8]); + Ok(()) + } +} + +impl Readable for PeerData { + fn read(reader: &mut Reader) -> Result { + let addr = SockAddr::read(reader)?; + let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); + let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; + let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; + match State::from_u8(fl) { + Some(flags) => { + Ok(PeerData { + addr: addr.0, + capabilities: capabilities, + user_agent: user_agent, + flags: flags, + }) + } + None => Err(ser::Error::CorruptedData), + } + } +} + /// Storage facility for peer data. pub struct PeerStore { db: grin_store::Store, @@ -65,30 +94,25 @@ impl PeerStore { } pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { - let key = to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes()); - self.db.put_enc(&mut PeerCodec, &key, p.clone()) + self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], + p) } fn get_peer(&self, peer_addr: SocketAddr) -> Result { - let key = peer_key(peer_addr); - option_to_not_found(self.db.get_dec(&mut PeerCodec, &key)) + option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..])) } pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { - let key = peer_key(peer_addr); - self.db.exists(&key) + self.db.exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) } pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { - let key = peer_key(peer_addr); - self.db.delete(&key) + self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) } pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - - let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes()); - let peers_iter = self.db.iter_dec(PeerCodec::default(), &key); - + let peers_iter = self.db + .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); let mut peers = Vec::with_capacity(count); for p in peers_iter { if p.flags == state && p.capabilities.contains(cap) { diff --git a/store/src/codec/block.rs b/store/src/codec/block.rs index e37ce55bb..602608ae1 100644 --- a/store/src/codec/block.rs +++ b/store/src/codec/block.rs @@ -19,7 +19,6 @@ use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; use num_bigint::BigUint; use time::Timespec; use time; -use std::marker::PhantomData; use core::core::{Input, Output, Proof, TxKernel, Block, BlockHeader}; use core::core::hash::Hash; @@ -39,83 +38,34 @@ macro_rules! try_opt_dec { }); } -/// Internal Convenience Trait -pub trait BlockEncode: Sized { - fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; -} - -/// Internal Convenience Trait -pub trait BlockDecode: Sized { - fn block_decode(src: &mut BytesMut) -> Result, io::Error>; -} - -/// Decodes and encodes `Block`s and their subtypes #[derive(Debug, Clone)] -pub struct BlockCodec { - phantom: PhantomData, -} +pub struct BlockCodec; -impl Default for BlockCodec - where T: BlockDecode + BlockEncode -{ - fn default() -> Self { - BlockCodec { phantom: PhantomData } - } -} - -impl codec::Encoder for BlockCodec - where T: BlockDecode + BlockEncode -{ - type Item = T; +impl codec::Encoder for BlockCodec { + type Item = Block; type Error = io::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - T::block_encode(&item, dst) - } -} - -impl codec::Decoder for BlockCodec - where T: BlockDecode + BlockEncode -{ - type Item = T; - type Error = io::Error; - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // Create Temporary Buffer - let ref mut temp = src.clone(); - let res = try_opt_dec!(T::block_decode(temp)?); - - // If succesfull truncate src by bytes read from src; - let diff = src.len() - temp.len(); - src.split_to(diff); - - // Return Item - Ok(Some(res)) - } -} - - -impl BlockEncode for Block { - fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { // Put Header - self.header.block_encode(dst)?; + item.header.block_encode(dst)?; // Put Lengths of Inputs, Outputs and Kernels in 3 u64's dst.reserve(24); - dst.put_u64::(self.inputs.len() as u64); - dst.put_u64::(self.outputs.len() as u64); - dst.put_u64::(self.kernels.len() as u64); + dst.put_u64::(item.inputs.len() as u64); + dst.put_u64::(item.outputs.len() as u64); + dst.put_u64::(item.kernels.len() as u64); // Put Inputs - for inp in &self.inputs { + for inp in &item.inputs { inp.block_encode(dst)?; } // Put Outputs - for outp in &self.outputs { + for outp in &item.outputs { outp.block_encode(dst)?; } // Put TxKernels - for proof in &self.kernels { + for proof in &item.kernels { proof.block_encode(dst)?; } @@ -123,9 +73,10 @@ impl BlockEncode for Block { } } -impl BlockDecode for Block { - fn block_decode(src: &mut BytesMut) -> Result, io::Error> { - +impl codec::Decoder for BlockCodec { + type Item = Block; + type Error = io::Error; + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { // Get Header let header = try_opt_dec!(BlockHeader::block_decode(src)?); @@ -178,6 +129,16 @@ impl codec::Encoder for BlockHasher { } } +/// Internal Convenience Trait +trait BlockEncode: Sized { + fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; +} + +/// Internal Convenience Trait +trait BlockDecode: Sized { + fn block_decode(src: &mut BytesMut) -> Result, io::Error>; +} + impl BlockEncode for BlockHeader { fn block_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error> { partial_block_encode(self, dst)?; @@ -501,3 +462,195 @@ impl BlockDecode for Proof { Ok(Some(Proof(proof_data))) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_have_block_codec_roundtrip() { + use tokio_io::codec::{Encoder, Decoder}; + + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let kernel = TxKernel { + features: KernelFeatures::empty(), + excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + excess_sig: vec![1; 10], + fee: 100, + }; + + let block = Block { + header: BlockHeader::default(), + inputs: vec![input], + outputs: vec![output], + kernels: vec![kernel], + }; + + let mut buf = BytesMut::with_capacity(0); + let mut codec = BlockCodec {}; + codec.encode(block.clone(), &mut buf).expect("Error During Block Encoding"); + + let d_block = + codec.decode(&mut buf).expect("Error During Block Decoding").expect("Unfinished Block"); + + assert_eq!(block.header.height, d_block.header.height); + assert_eq!(block.header.previous, d_block.header.previous); + assert_eq!(block.header.timestamp, d_block.header.timestamp); + assert_eq!(block.header.cuckoo_len, d_block.header.cuckoo_len); + assert_eq!(block.header.utxo_merkle, d_block.header.utxo_merkle); + assert_eq!(block.header.tx_merkle, d_block.header.tx_merkle); + assert_eq!(block.header.features, d_block.header.features); + assert_eq!(block.header.nonce, d_block.header.nonce); + assert_eq!(block.header.pow, d_block.header.pow); + assert_eq!(block.header.difficulty, d_block.header.difficulty); + assert_eq!(block.header.total_difficulty, + d_block.header.total_difficulty); + + assert_eq!(block.inputs[0].commitment(), d_block.inputs[0].commitment()); + + assert_eq!(block.outputs[0].features, d_block.outputs[0].features); + assert_eq!(block.outputs[0].proof().as_ref(), + d_block.outputs[0].proof().as_ref()); + assert_eq!(block.outputs[0].commitment(), + d_block.outputs[0].commitment()); + + assert_eq!(block.kernels[0].features, d_block.kernels[0].features); + assert_eq!(block.kernels[0].excess, d_block.kernels[0].excess); + assert_eq!(block.kernels[0].excess_sig, d_block.kernels[0].excess_sig); + assert_eq!(block.kernels[0].fee, d_block.kernels[0].fee); + + } + + #[test] + fn should_encode_and_decode_blockheader() { + + let block_header = BlockHeader::default(); + + let mut buf = BytesMut::with_capacity(0); + block_header.block_encode(&mut buf); + + let d_block_header = BlockHeader::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(block_header.height, d_block_header.height); + assert_eq!(block_header.previous, d_block_header.previous); + assert_eq!(block_header.timestamp, d_block_header.timestamp); + assert_eq!(block_header.cuckoo_len, d_block_header.cuckoo_len); + assert_eq!(block_header.utxo_merkle, d_block_header.utxo_merkle); + assert_eq!(block_header.tx_merkle, d_block_header.tx_merkle); + assert_eq!(block_header.features, d_block_header.features); + assert_eq!(block_header.nonce, d_block_header.nonce); + assert_eq!(block_header.pow, d_block_header.pow); + assert_eq!(block_header.difficulty, d_block_header.difficulty); + assert_eq!(block_header.total_difficulty, + d_block_header.total_difficulty); + + } + + + #[test] + fn should_encode_and_decode_input() { + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let mut buf = BytesMut::with_capacity(0); + input.block_encode(&mut buf); + + assert_eq!([1; PEDERSEN_COMMITMENT_SIZE].as_ref(), buf); + assert_eq!(input.commitment(), + Input::block_decode(&mut buf) + .unwrap() + .unwrap() + .commitment()); + } + + #[test] + fn should_encode_and_decode_output() { + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let mut buf = BytesMut::with_capacity(0); + output.block_encode(&mut buf); + + let d_output = Output::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(output.features, d_output.features); + assert_eq!(output.proof().as_ref(), d_output.proof().as_ref()); + assert_eq!(output.commitment(), d_output.commitment()); + + } + + #[test] + fn should_encode_and_decode_txkernel() { + + let kernel = TxKernel { + features: KernelFeatures::empty(), + excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + excess_sig: vec![1; 10], + fee: 100, + }; + + let mut buf = BytesMut::with_capacity(0); + kernel.block_encode(&mut buf); + + let d_kernel = TxKernel::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(kernel.features, d_kernel.features); + assert_eq!(kernel.excess, d_kernel.excess); + assert_eq!(kernel.excess_sig, d_kernel.excess_sig); + assert_eq!(kernel.fee, d_kernel.fee); + } + + #[test] + fn should_encode_and_decode_difficulty() { + + let difficulty = Difficulty::from_num(1000); + + let mut buf = BytesMut::with_capacity(0); + difficulty.block_encode(&mut buf); + + let d_difficulty = Difficulty::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(difficulty, d_difficulty); + } + + #[test] + fn should_encode_and_decode_hash() { + + let hash = Hash([1u8; 32]); + + let mut buf = BytesMut::with_capacity(0); + hash.block_encode(&mut buf); + + let d_hash = Hash::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(hash, d_hash); + } + + #[test] + fn should_encode_and_decode_proof() { + + let proof = Proof::zero(); + + let mut buf = BytesMut::with_capacity(0); + proof.block_encode(&mut buf); + + let d_proof = Proof::block_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(proof, d_proof); + } +} \ No newline at end of file diff --git a/store/src/codec/block_test.rs b/store/src/codec/block_test.rs deleted file mode 100644 index b512ff44a..000000000 --- a/store/src/codec/block_test.rs +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::io; - -use tokio_io::*; -use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; -use num_bigint::BigUint; -use time::Timespec; -use time; -use std::marker::PhantomData; - -use core::core::{Input, Output, Proof, TxKernel, Block, BlockHeader}; -use core::core::hash::Hash; -use core::core::target::Difficulty; -use core::core::transaction::{OutputFeatures, KernelFeatures}; -use core::core::block::BlockFeatures; -use core::consensus::PROOFSIZE; - -use secp::pedersen::{RangeProof, Commitment}; -use secp::constants::PEDERSEN_COMMITMENT_SIZE; - -use super::block::*; - -#[test] -fn should_have_block_codec_roundtrip() { - use tokio_io::codec::{Encoder, Decoder}; - - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let kernel = TxKernel { - features: KernelFeatures::empty(), - excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - excess_sig: vec![1; 10], - fee: 100, - }; - - let block = Block { - header: BlockHeader::default(), - inputs: vec![input], - outputs: vec![output], - kernels: vec![kernel], - }; - - let mut buf = BytesMut::with_capacity(0); - let mut codec = BlockCodec::default(); - codec - .encode(block.clone(), &mut buf) - .expect("Error During Block Encoding"); - - let d_block = codec - .decode(&mut buf) - .expect("Error During Block Decoding") - .expect("Unfinished Block"); - - // Check if all bytes are read - assert_eq!(buf.len(), 0); - assert_eq!(block, d_block); - -} - -#[test] -fn should_have_block_header_codec_roundtrip() { - use tokio_io::codec::{Encoder, Decoder}; - - let mut codec = BlockCodec::default(); - let block_header = BlockHeader::default(); - - let mut buf = BytesMut::with_capacity(0); - codec.encode(block_header.clone(), &mut buf); - - let d_block_header = codec.decode(&mut buf).unwrap().unwrap(); - - assert_eq!(block_header, d_block_header); - -} - - -#[test] -fn should_encode_and_decode_input() { - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let mut buf = BytesMut::with_capacity(0); - input.block_encode(&mut buf); - - let d_input = Input::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(input,d_input) -} - -#[test] -fn should_encode_and_decode_output() { - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let mut buf = BytesMut::with_capacity(0); - output.block_encode(&mut buf); - - let d_output = Output::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(output, d_output); -} - -#[test] -fn should_encode_and_decode_txkernel() { - - let kernel = TxKernel { - features: KernelFeatures::empty(), - excess: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - excess_sig: vec![1; 10], - fee: 100, - }; - - let mut buf = BytesMut::with_capacity(0); - kernel.block_encode(&mut buf); - - let d_kernel = TxKernel::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(kernel, d_kernel); -} - -#[test] -fn should_encode_and_decode_difficulty() { - - let difficulty = Difficulty::from_num(1000); - - let mut buf = BytesMut::with_capacity(0); - difficulty.block_encode(&mut buf); - - let d_difficulty = Difficulty::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(difficulty, d_difficulty); -} - -#[test] -fn should_encode_and_decode_hash() { - - let hash = Hash([1u8; 32]); - - let mut buf = BytesMut::with_capacity(0); - hash.block_encode(&mut buf); - - let d_hash = Hash::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(hash, d_hash); -} - -#[test] -fn should_encode_and_decode_proof() { - - let proof = Proof::zero(); - - let mut buf = BytesMut::with_capacity(0); - proof.block_encode(&mut buf); - - let d_proof = Proof::block_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(proof, d_proof); -} diff --git a/store/src/codec/mod.rs b/store/src/codec/mod.rs index 05921491c..97901f273 100644 --- a/store/src/codec/mod.rs +++ b/store/src/codec/mod.rs @@ -12,15 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Codecs for Blocks and Transactions - pub mod block; pub mod tx; -#[cfg(test)] -mod block_test; -#[cfg(test)] -mod tx_test; - -pub use self::block::{BlockCodec, BlockHasher }; +pub use self::block::{BlockCodec, BlockHasher}; pub use self::tx::TxCodec; + diff --git a/store/src/codec/tx.rs b/store/src/codec/tx.rs index 27d19f729..72bca4a1f 100644 --- a/store/src/codec/tx.rs +++ b/store/src/codec/tx.rs @@ -31,8 +31,7 @@ macro_rules! try_opt_dec { }); } -/// Decodes and encodes `Transaction`s -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct TxCodec; impl codec::Encoder for TxCodec { @@ -122,12 +121,12 @@ impl codec::Decoder for TxCodec { } /// Internal Convenience Trait -pub trait TxEncode: Sized { +trait TxEncode: Sized { fn tx_encode(&self, dst: &mut BytesMut) -> Result<(), io::Error>; } /// Internal Convenience Trait -pub trait TxDecode: Sized { +trait TxDecode: Sized { fn tx_decode(src: &mut BytesMut) -> Result, io::Error>; } @@ -188,4 +187,85 @@ impl TxDecode for Input { Ok(Some(Input(Commitment(c)))) } -} \ No newline at end of file +} + + + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_have_tx_codec_roundtrip() { + use tokio_io::codec::{Encoder, Decoder}; + + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let tx = Transaction { + inputs: vec![input], + outputs: vec![output], + fee: 0, + excess_sig: vec![0; 10], + }; + + let mut buf = BytesMut::with_capacity(0); + let mut codec = TxCodec {}; + codec.encode(tx.clone(), &mut buf).expect("Error During Transaction Encoding"); + + let d_tx = codec.decode(&mut buf) + .expect("Error During Transaction Decoding") + .expect("Unfinished Transaction"); + + assert_eq!(tx.inputs[0].commitment(), d_tx.inputs[0].commitment()); + assert_eq!(tx.outputs[0].features, d_tx.outputs[0].features); + assert_eq!(tx.fee, d_tx.fee); + assert_eq!(tx.excess_sig, d_tx.excess_sig); + } + + #[test] + fn should_encode_and_decode_output() { + let output = Output { + features: OutputFeatures::empty(), + commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), + proof: RangeProof { + proof: [1; 5134], + plen: 5134, + }, + }; + + let mut buf = BytesMut::with_capacity(0); + output.tx_encode(&mut buf); + + let d_output = Output::tx_decode(&mut buf).unwrap().unwrap(); + + assert_eq!(output.features, d_output.features); + assert_eq!(output.proof().as_ref(), d_output.proof().as_ref()); + assert_eq!(output.commitment(), d_output.commitment()); + + } + + #[test] + fn should_encode_and_decode_input() { + let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); + + let mut buf = BytesMut::with_capacity(0); + input.tx_encode(&mut buf); + + assert_eq!([1; PEDERSEN_COMMITMENT_SIZE].as_ref(), buf); + assert_eq!(input.commitment(), + Input::tx_decode(&mut buf) + .unwrap() + .unwrap() + .commitment()); + } +} diff --git a/store/src/codec/tx_test.rs b/store/src/codec/tx_test.rs deleted file mode 100644 index da1d7a882..000000000 --- a/store/src/codec/tx_test.rs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2016 The Grin Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::io; - -use tokio_io::*; -use bytes::{BytesMut, BigEndian, BufMut, Buf, IntoBuf}; - -use core::core::{Input, Output, Transaction}; -use core::core::transaction::OutputFeatures; - -use secp::pedersen::{RangeProof, Commitment}; -use secp::constants::PEDERSEN_COMMITMENT_SIZE; - -use super::tx::*; - -#[test] -fn should_have_tx_codec_roundtrip() { - use tokio_io::codec::{Encoder, Decoder}; - - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let tx = Transaction { - inputs: vec![input], - outputs: vec![output], - fee: 0, - excess_sig: vec![0; 10], - }; - - let mut buf = BytesMut::with_capacity(0); - let mut codec = TxCodec {}; - codec - .encode(tx.clone(), &mut buf) - .expect("Error During Transaction Encoding"); - - let d_tx = codec - .decode(&mut buf) - .expect("Error During Transaction Decoding") - .expect("Unfinished Transaction"); - - assert_eq!(tx, d_tx); -} - -#[test] -fn should_encode_and_decode_output() { - let output = Output { - features: OutputFeatures::empty(), - commit: Commitment([1; PEDERSEN_COMMITMENT_SIZE]), - proof: RangeProof { - proof: [1; 5134], - plen: 5134, - }, - }; - - let mut buf = BytesMut::with_capacity(0); - output.tx_encode(&mut buf); - - let d_output = Output::tx_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(output, d_output); -} - -#[test] -fn should_encode_and_decode_input() { - let input = Input(Commitment([1; PEDERSEN_COMMITMENT_SIZE])); - - let mut buf = BytesMut::with_capacity(0); - input.tx_encode(&mut buf); - - let d_input = Input::tx_decode(&mut buf).unwrap().unwrap(); - - assert_eq!(input, d_input); -} diff --git a/store/src/lib.rs b/store/src/lib.rs index 42831edfa..75f20e10f 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -35,16 +35,13 @@ use std::fmt; use std::iter::Iterator; use std::marker::PhantomData; use std::sync::RwLock; -use tokio_io::codec::{Encoder,Decoder}; -use bytes::BytesMut; -use bytes::buf::{FromBuf, IntoBuf}; use byteorder::{WriteBytesExt, BigEndian}; use rocksdb::{DB, WriteBatch, DBCompactionStyle, DBIterator, IteratorMode, Direction}; use core::ser; -pub mod codec; +mod codec; use codec::{BlockCodec, BlockHasher, TxCodec}; /// Main error type for this crate. @@ -57,8 +54,6 @@ pub enum Error { RocksDbErr(String), /// Wraps a serialization error for Writeable or Readable SerErr(ser::Error), - /// Wraps an Io Error - Io(std::io::Error), } impl fmt::Display for Error { @@ -67,7 +62,6 @@ impl fmt::Display for Error { &Error::NotFoundErr => write!(f, "Not Found"), &Error::RocksDbErr(ref s) => write!(f, "RocksDb Error: {}", s), &Error::SerErr(ref e) => write!(f, "Serialization Error: {}", e.to_string()), - &Error::Io(ref e) => write!(f, "Codec Error: {}", e) } } } @@ -78,12 +72,6 @@ impl From for Error { } } -impl From for Error { - fn from(e: std::io::Error) -> Error { - Error::Io(e) - } -} - /// Thread-safe rocksdb wrapper pub struct Store { rdb: RwLock, @@ -110,25 +98,13 @@ impl Store { db.put(key, &value[..]).map_err(&From::from) } - /// Writes a single key and a value using a given encoder. - pub fn put_enc(&self, encoder: &mut E, key: &[u8], value: E::Item) -> Result<(), Error> - where Error: From { - - let mut data = BytesMut::with_capacity(0); - encoder.encode(value, &mut data)?; - self.put(key, data.to_vec()) - } - - /// Gets a value from the db, provided its key and corresponding decoder - pub fn get_dec(&self, decoder: &mut D, key: &[u8]) -> Result, Error> - where Error: From { - - let data = self.get(key)?; - if let Some(buf) = data { - let mut buf = BytesMut::from_buf(buf); - decoder.decode(&mut buf).map_err(From::from) - } else { - Ok(None) + /// Writes a single key and its `Writeable` value to the db. Encapsulates + /// serialization. + pub fn put_ser(&self, key: &[u8], value: &W) -> Result<(), Error> { + let ser_value = ser::ser_vec(value); + match ser_value { + Ok(data) => self.put(key, data), + Err(err) => Err(Error::SerErr(err)), } } @@ -138,6 +114,30 @@ impl Store { db.get(key).map(|r| r.map(|o| o.to_vec())).map_err(From::from) } + /// Gets a `Readable` value from the db, provided its key. Encapsulates + /// serialization. + pub fn get_ser(&self, key: &[u8]) -> Result, Error> { + self.get_ser_limited(key, 0) + } + + /// Gets a `Readable` value from the db, provided its key, allowing to + /// extract only partial data. The underlying Readable size must align + /// accordingly. Encapsulates serialization. + pub fn get_ser_limited(&self, + key: &[u8], + len: usize) + -> Result, Error> { + let data = try!(self.get(key)); + match data { + Some(val) => { + let mut lval = if len > 0 { &val[..len] } else { &val[..] }; + let r = try!(ser::deserialize(&mut lval).map_err(Error::SerErr)); + Ok(Some(r)) + } + None => Ok(None), + } + } + /// Whether the provided key exists pub fn exists(&self, key: &[u8]) -> Result { let db = self.rdb.read().unwrap(); @@ -150,15 +150,17 @@ impl Store { db.delete(key).map_err(From::from) } - /// Produces an iterator of items decoded by a decoder moving forward from the provided key. - pub fn iter_dec(&self, codec: D, from: &[u8]) -> DecIterator { + /// Produces an iterator of `Readable` types moving forward from the + /// provided + /// key. + pub fn iter(&self, from: &[u8]) -> SerIterator { let db = self.rdb.read().unwrap(); - DecIterator { + SerIterator { iter: db.iterator(IteratorMode::From(from, Direction::Forward)), - codec: codec + _marker: PhantomData, } } - + /// Builds a new batch to be used with this store. pub fn batch(&self) -> Batch { Batch { @@ -180,13 +182,17 @@ pub struct Batch<'a> { } impl<'a> Batch<'a> { - - /// Using a given encoder, Writes a single key and a value to the batch. - pub fn put_enc(mut self, encoder: &mut E, key: &[u8], value: E::Item) -> Result, Error> where Error: From { - let mut data = BytesMut::with_capacity(0); - encoder.encode(value, &mut data)?; - self.batch.put(key, &data)?; - Ok(self) + /// Writes a single key and its `Writeable` value to the batch. The write + /// function must be called to "commit" the batch to storage. + pub fn put_ser(mut self, key: &[u8], value: &W) -> Result, Error> { + let ser_value = ser::ser_vec(value); + match ser_value { + Ok(data) => { + self.batch.put(key, &data[..])?; + Ok(self) + } + Err(err) => Err(Error::SerErr(err)), + } } /// Writes the batch to RocksDb. @@ -195,24 +201,29 @@ impl<'a> Batch<'a> { } } -/// An iterator that produces items from a `DBIterator` instance with a given `Decoder`. -/// Iterates and decodes returned values -pub struct DecIterator where D: Decoder { +/// An iterator thad produces Readable instances back. Wraps the lower level +/// DBIterator and deserializes the returned values. +pub struct SerIterator + where T: ser::Readable +{ iter: DBIterator, - codec: D + _marker: PhantomData, } -impl Iterator for DecIterator where D: Decoder { - type Item = D::Item; - fn next(&mut self) -> Option { +impl Iterator for SerIterator + where T: ser::Readable +{ + type Item = T; + + fn next(&mut self) -> Option { let next = self.iter.next(); - next.and_then(|(_, v)| { - self.codec.decode(&mut BytesMut::from(v.as_ref())).ok() - }).unwrap_or(None) + next.and_then(|r| { + let (_, v) = r; + ser::deserialize(&mut &v[..]).ok() + }) } } - /// Build a db key from a prefix and a byte vector identifier. pub fn to_key(prefix: u8, k: &mut Vec) -> Vec { let mut res = Vec::with_capacity(k.len() + 2);