From a4476443bbdb376d42a27f46393426f9102533a4 Mon Sep 17 00:00:00 2001 From: Antioch Peverell Date: Tue, 18 Sep 2018 15:25:26 +0100 Subject: [PATCH] Compact transactions (initial prep work) (#1548) * introduce CompactTransaction (unused currently) * rustfmt * fix comments --- core/src/core/compact_transaction.rs | 179 +++++++++++++++++++++++++++ core/src/core/id.rs | 26 ++-- core/src/core/mod.rs | 2 + core/src/core/pmmr.rs | 3 +- core/src/core/transaction.rs | 6 +- core/src/core/verifier_cache.rs | 6 +- p2p/src/msg.rs | 2 + pool/src/pool.rs | 56 ++++++--- pool/src/transaction_pool.rs | 16 ++- servers/src/common/adapters.rs | 21 ++-- 10 files changed, 264 insertions(+), 53 deletions(-) create mode 100644 core/src/core/compact_transaction.rs diff --git a/core/src/core/compact_transaction.rs b/core/src/core/compact_transaction.rs new file mode 100644 index 000000000..c25e09fdc --- /dev/null +++ b/core/src/core/compact_transaction.rs @@ -0,0 +1,179 @@ +// Copyright 2018 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Compact Transactions. + +use rand::{thread_rng, RngCore}; + +use consensus::VerifySortOrder; +use core::hash::{Hash, Hashed}; +use core::id::{ShortId, ShortIdentifiable}; +use core::transaction::{Error, Transaction}; +use ser::{self, read_multi, Readable, Reader, Writeable, Writer}; + +/// A compact transaction body, wrapping a vec of kernel short_ids. +#[derive(Debug, Clone)] +pub struct CompactTransactionBody { + /// The vec of kernel short_ids that constitute the full transaction. + pub kern_ids: Vec, +} + +impl CompactTransactionBody { + fn init(kern_ids: Vec, verify_sorted: bool) -> Result { + let body = CompactTransactionBody { kern_ids }; + + if verify_sorted { + // If we are verifying sort order then verify and + // return an error if not sorted lexicographically. + body.verify_sorted()?; + Ok(body) + } else { + // If we are not verifying sort order then sort in place and return. + let mut body = body; + body.sort(); + Ok(body) + } + } + + /// Sort everything. + fn sort(&mut self) { + self.kern_ids.sort(); + } + + /// "Lightweight" validation. + fn validate_read(&self) -> Result<(), Error> { + self.verify_sorted()?; + Ok(()) + } + + // Verify everything is sorted in lexicographical order. + fn verify_sorted(&self) -> Result<(), Error> { + self.kern_ids.verify_sort_order()?; + Ok(()) + } +} + +impl Readable for CompactTransactionBody { + fn read(reader: &mut Reader) -> Result { + let kern_id_len = reader.read_u64()?; + let kern_ids = read_multi(reader, kern_id_len)?; + + // Initialize transaction transaction body, verifying sort order. + let body = + CompactTransactionBody::init(kern_ids, true).map_err(|_| ser::Error::CorruptedData)?; + + Ok(body) + } +} + +impl Writeable for CompactTransactionBody { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + writer.write_u64(self.kern_ids.len() as u64)?; + self.kern_ids.write(writer)?; + Ok(()) + } +} + +impl Into for CompactTransaction { + fn into(self) -> CompactTransactionBody { + self.body + } +} + +/// A CompactTransaction is a vec of kernel short_ids with the +/// associated tx hash and nonce to allow kernels to be rehashed +/// and compared against these short_ids. +#[derive(Debug, Clone)] +pub struct CompactTransaction { + /// Hash of the latest block header (used as part of short_id generation). + pub tx_hash: Hash, + /// Nonce for connection specific short_ids. + pub nonce: u64, + /// Container for kern_ids in the compact transaction. + body: CompactTransactionBody, +} + +impl CompactTransaction { + /// "Lightweight" validation. + fn validate_read(&self) -> Result<(), Error> { + self.body.validate_read()?; + Ok(()) + } + + /// Get kern_ids. + pub fn kern_ids(&self) -> &Vec { + &self.body.kern_ids + } + + /// The hash of the compact transaction is the hash of the underlying transaction. + /// TODO - is this wise? + pub fn hash(&self) -> Hash { + self.tx_hash + } +} + +impl From for CompactTransaction { + fn from(tx: Transaction) -> Self { + // TODO - Are we ok using the tx as the source of the hash for generating the short_ids? + let tx_hash = tx.hash(); + + // Generate a random nonce (short_ids specific to a particular peer connection). + let nonce = thread_rng().next_u64(); + + let mut kern_ids = vec![]; + + for k in tx.kernels() { + kern_ids.push(k.short_id(&tx_hash, nonce)); + } + + // Initialize a compact transaction body and sort everything. + let body = CompactTransactionBody::init(kern_ids, false).expect("sorting, not verifying"); + + CompactTransaction { + tx_hash, + nonce, + body, + } + } +} + +impl Writeable for CompactTransaction { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.tx_hash.write(writer)?; + writer.write_u64(self.nonce)?; + self.body.write(writer)?; + Ok(()) + } +} + +impl Readable for CompactTransaction { + fn read(reader: &mut Reader) -> Result { + let tx_hash = Hash::read(reader)?; + let nonce = reader.read_u64()?; + let body = CompactTransactionBody::read(reader)?; + + let compact_tx = CompactTransaction { + tx_hash, + nonce, + body, + }; + + // Now validate the compact transaction and treat any validation error as corrupted data. + compact_tx + .validate_read() + .map_err(|_| ser::Error::CorruptedData)?; + + Ok(compact_tx) + } +} diff --git a/core/src/core/id.rs b/core/src/core/id.rs index 21c0a8f77..4626b5109 100644 --- a/core/src/core/id.rs +++ b/core/src/core/id.rs @@ -70,7 +70,7 @@ impl ShortIdentifiable for H { } /// Short id for identifying inputs/outputs/kernels -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Hash)] pub struct ShortId([u8; 6]); /// We want to sort short_ids in a canonical and consistent manner so we can @@ -170,9 +170,9 @@ mod test { let foo = Foo(0); - let expected_hash = Hash::from_hex( - "81e47a19e6b29b0a65b9591762ce5143ed30d0261e5d24a3201752506b20f15c", - ).unwrap(); + let expected_hash = + Hash::from_hex("81e47a19e6b29b0a65b9591762ce5143ed30d0261e5d24a3201752506b20f15c") + .unwrap(); assert_eq!(foo.hash(), expected_hash); let other_hash = Hash::default(); @@ -182,9 +182,9 @@ mod test { ); let foo = Foo(5); - let expected_hash = Hash::from_hex( - "3a42e66e46dd7633b57d1f921780a1ac715e6b93c19ee52ab714178eb3a9f673", - ).unwrap(); + let expected_hash = + Hash::from_hex("3a42e66e46dd7633b57d1f921780a1ac715e6b93c19ee52ab714178eb3a9f673") + .unwrap(); assert_eq!(foo.hash(), expected_hash); let other_hash = Hash::default(); @@ -194,14 +194,14 @@ mod test { ); let foo = Foo(5); - let expected_hash = Hash::from_hex( - "3a42e66e46dd7633b57d1f921780a1ac715e6b93c19ee52ab714178eb3a9f673", - ).unwrap(); + let expected_hash = + Hash::from_hex("3a42e66e46dd7633b57d1f921780a1ac715e6b93c19ee52ab714178eb3a9f673") + .unwrap(); assert_eq!(foo.hash(), expected_hash); - let other_hash = Hash::from_hex( - "81e47a19e6b29b0a65b9591762ce5143ed30d0261e5d24a3201752506b20f15c", - ).unwrap(); + let other_hash = + Hash::from_hex("81e47a19e6b29b0a65b9591762ce5143ed30d0261e5d24a3201752506b20f15c") + .unwrap(); assert_eq!( foo.short_id(&other_hash, foo.0), ShortId::from_hex("3e9cde72a687").unwrap() diff --git a/core/src/core/mod.rs b/core/src/core/mod.rs index 4e1d215ed..53cd98ed8 100644 --- a/core/src/core/mod.rs +++ b/core/src/core/mod.rs @@ -17,6 +17,7 @@ pub mod block; pub mod committed; pub mod compact_block; +pub mod compact_transaction; pub mod hash; pub mod id; pub mod merkle_proof; @@ -35,6 +36,7 @@ use util::secp::pedersen::Commitment; pub use self::block::*; pub use self::committed::Committed; pub use self::compact_block::*; +pub use self::compact_transaction::*; pub use self::id::ShortId; pub use self::transaction::*; use core::hash::Hashed; diff --git a/core/src/core/pmmr.rs b/core/src/core/pmmr.rs index 0d3fef8ac..aa50feecd 100644 --- a/core/src/core/pmmr.rs +++ b/core/src/core/pmmr.rs @@ -158,8 +158,7 @@ where // here we want to get from underlying hash file // as the pos *may* have been "removed" self.backend.get_from_file(pi) - }) - .collect() + }).collect() } fn peak_path(&self, peak_pos: u64) -> Vec { diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index c2c65a339..bd4e9720e 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -1284,9 +1284,9 @@ mod test { commit: commit, }; - let block_hash = Hash::from_hex( - "3a42e66e46dd7633b57d1f921780a1ac715e6b93c19ee52ab714178eb3a9f673", - ).unwrap(); + let block_hash = + Hash::from_hex("3a42e66e46dd7633b57d1f921780a1ac715e6b93c19ee52ab714178eb3a9f673") + .unwrap(); let nonce = 0; diff --git a/core/src/core/verifier_cache.rs b/core/src/core/verifier_cache.rs index d2bcd7ee9..72a2e9c68 100644 --- a/core/src/core/verifier_cache.rs +++ b/core/src/core/verifier_cache.rs @@ -69,8 +69,7 @@ impl VerifierCache for LruVerifierCache { .kernel_sig_verification_cache .get_mut(&x.hash()) .unwrap_or(&mut false) - }) - .cloned() + }).cloned() .collect::>(); debug!( LOGGER, @@ -89,8 +88,7 @@ impl VerifierCache for LruVerifierCache { .rangeproof_verification_cache .get_mut(&x.proof.hash()) .unwrap_or(&mut false) - }) - .cloned() + }).cloned() .collect::>(); debug!( LOGGER, diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 387bf93bf..df1d2583b 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -69,6 +69,8 @@ enum_from_primitive! { TxHashSetRequest = 16, TxHashSetArchive = 17, BanReason = 18, + // GetTransaction = 19, + // CompactTransaction = 20, } } diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 03565853e..eed202079 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -20,10 +20,10 @@ use std::sync::{Arc, RwLock}; use core::consensus; use core::core::hash::{Hash, Hashed}; -use core::core::id::ShortIdentifiable; +use core::core::id::{ShortId, ShortIdentifiable}; use core::core::transaction; use core::core::verifier_cache::VerifierCache; -use core::core::{Block, CompactBlock, Transaction, TxKernel}; +use core::core::{Block, Transaction, TxKernel}; use types::{BlockChain, PoolEntry, PoolEntryState, PoolError}; use util::LOGGER; @@ -58,30 +58,53 @@ impl Pool { } /// Does the transaction pool contain an entry for the given transaction? - pub fn contains_tx(&self, tx: &Transaction) -> bool { - self.entries.iter().any(|x| x.tx.hash() == tx.hash()) + pub fn contains_tx(&self, hash: Hash) -> bool { + self.entries.iter().any(|x| x.tx.hash() == hash) + } + + pub fn get_tx(&self, hash: Hash) -> Option { + self.entries + .iter() + .find(|x| x.tx.hash() == hash) + .map(|x| x.tx.clone()) } /// Query the tx pool for all known txs based on kernel short_ids /// from the provided compact_block. /// Note: does not validate that we return the full set of required txs. /// The caller will need to validate that themselves. - pub fn retrieve_transactions(&self, cb: &CompactBlock) -> Vec { - let mut txs = vec![]; + pub fn retrieve_transactions( + &self, + hash: Hash, + nonce: u64, + kern_ids: &Vec, + ) -> (Vec, Vec) { + let mut rehashed = HashMap::new(); + // Rehash all entries in the pool using short_ids based on provided hash and nonce. for x in &self.entries { - for kernel in x.tx.kernels() { + for k in x.tx.kernels() { // rehash each kernel to calculate the block specific short_id - let short_id = kernel.short_id(&cb.hash(), cb.nonce); - - // if any kernel matches then keep the tx for later - if cb.kern_ids().contains(&short_id) { - txs.push(x.tx.clone()); - break; - } + let short_id = k.short_id(&hash, nonce); + rehashed.insert(short_id, x.tx.hash()); } } - txs + + // Retrive the txs from the pool by the set of unique hashes. + let hashes: HashSet<_> = rehashed.values().collect(); + let txs = hashes.into_iter().filter_map(|x| self.get_tx(*x)).collect(); + + // Calculate the missing ids based on the ids passed in + // and the ids that successfully matched txs. + let matched_ids: HashSet<_> = rehashed.keys().collect(); + let all_ids: HashSet<_> = kern_ids.iter().collect(); + let missing_ids = all_ids + .difference(&matched_ids) + .map(|x| *x) + .cloned() + .collect(); + + (txs, missing_ids) } /// Take pool transactions, filtering and ordering them in a way that's @@ -99,8 +122,7 @@ impl Pool { .filter_map(|mut bucket| { bucket.truncate(MAX_TX_CHAIN); transaction::aggregate(bucket, self.verifier_cache.clone()).ok() - }) - .collect(); + }).collect(); // sort by fees over weight, multiplying by 1000 to keep some precision // don't think we'll ever see a >max_u64/1000 fee transaction diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index ce9b3ab5f..ab1b3d97e 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -21,9 +21,10 @@ use std::sync::{Arc, RwLock}; use chrono::prelude::Utc; -use core::core::hash::Hash; +use core::core::hash::{Hash, Hashed}; +use core::core::id::ShortId; use core::core::verifier_cache::VerifierCache; -use core::core::{transaction, Block, CompactBlock, Transaction}; +use core::core::{transaction, Block, Transaction}; use pool::Pool; use types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource}; @@ -103,7 +104,7 @@ impl TransactionPool { ) -> Result<(), PoolError> { // Quick check to deal with common case of seeing the *same* tx // broadcast from multiple peers simultaneously. - if !stem && self.txpool.contains_tx(&tx) { + if !stem && self.txpool.contains_tx(tx.hash()) { return Err(PoolError::DuplicateTx); } @@ -153,8 +154,13 @@ impl TransactionPool { /// Retrieve all transactions matching the provided "compact block" /// based on the kernel set. /// Note: we only look in the txpool for this (stempool is under embargo). - pub fn retrieve_transactions(&self, cb: &CompactBlock) -> Vec { - self.txpool.retrieve_transactions(cb) + pub fn retrieve_transactions( + &self, + hash: Hash, + nonce: u64, + kern_ids: &Vec, + ) -> (Vec, Vec) { + self.txpool.retrieve_transactions(hash, nonce, kern_ids) } /// Whether the transaction is acceptable to the pool, given both how diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 93f6b630c..0d418bac7 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -146,12 +146,17 @@ impl p2p::ChainAdapter for NetToChainAdapter { return !e.is_bad_data(); } - let txs = { + let (txs, missing_short_ids) = { let tx_pool = self.tx_pool.read().unwrap(); - tx_pool.retrieve_transactions(&cb) + tx_pool.retrieve_transactions(cb.hash(), cb.nonce, cb.kern_ids()) }; - debug!(LOGGER, "adapter: txs from tx pool - {}", txs.len(),); + debug!( + LOGGER, + "adapter: txs from tx pool - {}, (unknown kern_ids: {})", + txs.len(), + missing_short_ids.len(), + ); // TODO - 3 scenarios here - // 1) we hydrate a valid block (good to go) @@ -177,8 +182,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { &prev.total_kernel_offset, &prev.total_kernel_sum, self.verifier_cache.clone(), - ) - .is_ok() + ).is_ok() { debug!(LOGGER, "adapter: successfully hydrated block from tx pool!"); self.process_block(block, addr) @@ -450,10 +454,9 @@ impl NetToChainAdapter { let head = chain.head().unwrap(); // we have a fast sync'd node and are sent a block older than our horizon, // only sync can do something with that - if b.header.height - < head - .height - .saturating_sub(global::cut_through_horizon() as u64) + if b.header.height < head + .height + .saturating_sub(global::cut_through_horizon() as u64) { return true; }