From eb9cc7ef13b40b9ac0f4b13fb9b2a1d108064240 Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sat, 10 Jun 2017 12:51:33 -0700 Subject: [PATCH] Integrate transaction pool with rest of the system * Transactions coming from the network are now pushed to the pool through the net adapter. * New blocks accepted by the chain are sent to the pool for eviction. * The miner requests transactions from the pool to build its blocks. * The push API adds to the pool, removing the mock. * Implementation of the adapter to the chain required by the pool to get consistent UTXOs. Grossly unoptimized until we have the UTXO MMR ready. --- api/Cargo.toml | 1 + api/src/endpoints.rs | 49 ++++++++++++++---- api/src/lib.rs | 1 + core/src/core/block.rs | 6 +-- grin/Cargo.toml | 1 + grin/src/adapters.rs | 109 ++++++++++++++++++++++++++++++++++++----- grin/src/lib.rs | 1 + grin/src/miner.rs | 57 ++++++++++++--------- grin/src/server.rs | 24 ++++++--- pool/src/lib.rs | 2 +- pool/src/pool.rs | 7 +-- store/src/lib.rs | 11 +++-- wallet/src/checker.rs | 1 - 13 files changed, 207 insertions(+), 63 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index 9465f880e..708c3c9a7 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -7,6 +7,7 @@ workspace = ".." [dependencies] grin_core = { path = "../core" } grin_chain = { path = "../chain" } +grin_pool = { path = "../pool" } grin_util = { path = "../util" } secp256k1zkp = { path = "../secp256k1zkp" } diff --git a/api/src/endpoints.rs b/api/src/endpoints.rs index e9105775e..c0f8b3cb7 100644 --- a/api/src/endpoints.rs +++ b/api/src/endpoints.rs @@ -21,13 +21,14 @@ // } // } -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread; use core::core::{Transaction, Output}; use core::core::hash::Hash; use core::ser; use chain::{self, Tip}; +use pool; use rest::*; use secp::pedersen::Commitment; use util; @@ -85,17 +86,36 @@ impl ApiEndpoint for OutputApi { /// ApiEndpoint implementation for the transaction pool, to check its status /// and size as well as push new transactions. #[derive(Clone)] -pub struct PoolApi { +pub struct PoolApi { + tx_pool: Arc>>, } -impl ApiEndpoint for PoolApi { +#[derive(Serialize, Deserialize)] +struct PoolInfo { + pool_size: usize, + orphans_size: usize, + total_size: usize, +} + +impl ApiEndpoint for PoolApi + where T: pool::BlockChain + Clone + Send + Sync + 'static +{ type ID = String; - type T = (); + type T = PoolInfo; type OP_IN = TxWrapper; type OP_OUT = (); fn operations(&self) -> Vec { - vec![Operation::Custom("push".to_string())] + vec![Operation::Get, Operation::Custom("push".to_string())] + } + + fn get(&self, id: String) -> ApiResult { + let pool = self.tx_pool.read().unwrap(); + Ok(PoolInfo { + pool_size: pool.pool_size(), + orphans_size: pool.orphans_size(), + total_size: pool.total_size(), + }) } fn operation(&self, op: String, input: TxWrapper) -> ApiResult<()> { @@ -106,8 +126,15 @@ impl ApiEndpoint for PoolApi { Error::Argument("Could not deserialize transaction, invalid format.".to_string()) })?; - println!("Fake push of transaction:"); - println!("{:?}", tx); + let source = pool::TxSource { + debug_name: "push-api".to_string(), + identifier: "?.?.?.?".to_string(), + }; + self.tx_pool + .write() + .unwrap() + .add_to_memory_pool(source, tx) + .map_err(|e| Error::Internal(format!("Addition to transaction pool failed: {:?}", e)))?; Ok(()) } } @@ -120,7 +147,11 @@ struct TxWrapper { /// Start all server REST APIs. Just register all of them on a ApiServer /// instance and runs the corresponding HTTP server. -pub fn start_rest_apis(addr: String, chain_store: Arc) { +pub fn start_rest_apis(addr: String, + chain_store: Arc, + tx_pool: Arc>>) + where T: pool::BlockChain + Clone + Send + Sync + 'static +{ thread::spawn(move || { let mut apis = ApiServer::new("/v1".to_string()); @@ -128,7 +159,7 @@ pub fn start_rest_apis(addr: String, chain_store: Arc) { ChainApi { chain_store: chain_store.clone() }); apis.register_endpoint("/chain/output".to_string(), OutputApi { chain_store: chain_store.clone() }); - apis.register_endpoint("/pool".to_string(), PoolApi {}); + apis.register_endpoint("/pool".to_string(), PoolApi { tx_pool: tx_pool }); apis.start(&addr[..]).unwrap_or_else(|e| { error!("Failed to start API HTTP server: {}.", e); diff --git a/api/src/lib.rs b/api/src/lib.rs index 9492d16e2..8109795a1 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -14,6 +14,7 @@ extern crate grin_core as core; extern crate grin_chain as chain; +extern crate grin_pool as pool; extern crate grin_util as util; extern crate secp256k1zkp as secp; diff --git a/core/src/core/block.rs b/core/src/core/block.rs index a9cf1f924..58530d41d 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -230,7 +230,7 @@ impl Block { /// transactions and the private key that will receive the reward. Checks /// that all transactions are valid and calculates the Merkle tree. pub fn new(prev: &BlockHeader, - txs: Vec<&mut Transaction>, + txs: Vec<&Transaction>, reward_key: SecretKey) -> Result { @@ -244,7 +244,7 @@ impl Block { /// a vector of transactions and the reward information. Checks /// that all transactions are valid and calculates the Merkle tree. pub fn with_reward(prev: &BlockHeader, - txs: Vec<&mut Transaction>, + txs: Vec<&Transaction>, reward_out: Output, reward_kern: TxKernel) -> Result { @@ -493,7 +493,7 @@ mod test { // utility to create a block without worrying about the key or previous // header - fn new_block(txs: Vec<&mut Transaction>, secp: &Secp256k1) -> Block { + fn new_block(txs: Vec<&Transaction>, secp: &Secp256k1) -> Block { let mut rng = OsRng::new().unwrap(); let skey = SecretKey::new(secp, &mut rng); Block::new(&BlockHeader::default(), txs, skey).unwrap() diff --git a/grin/Cargo.toml b/grin/Cargo.toml index 8afbdd320..8f40b3d43 100644 --- a/grin/Cargo.toml +++ b/grin/Cargo.toml @@ -10,6 +10,7 @@ grin_chain = { path = "../chain" } grin_core = { path = "../core" } grin_store = { path = "../store" } grin_p2p = { path = "../p2p" } +grin_pool = { path = "../pool" } grin_util = { path = "../util" } secp256k1zkp = { path = "../secp256k1zkp" } diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 8aa1edf72..cb53d8da3 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -14,14 +14,16 @@ use std::net::SocketAddr; use std::ops::Deref; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; use chain::{self, ChainAdapter}; -use core::core; +use core::core::{self, Output}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use p2p::{self, NetAdapter, Server, PeerStore, PeerData, Capabilities, State}; +use pool; +use secp::pedersen::Commitment; use util::OneTime; use store; use sync; @@ -33,8 +35,9 @@ pub struct NetToChainAdapter { /// the reference copy of the current chain state chain_head: Arc>, chain_store: Arc, - chain_adapter: Arc, + chain_adapter: Arc, peer_store: Arc, + tx_pool: Arc>>, syncer: OneTime>, } @@ -45,7 +48,13 @@ impl NetAdapter for NetToChainAdapter { } fn transaction_received(&self, tx: core::Transaction) { - unimplemented!(); + let source = pool::TxSource { + debug_name: "p2p".to_string(), + identifier: "?.?.?.?".to_string(), + }; + if let Err(e) = self.tx_pool.write().unwrap().add_to_memory_pool(source, tx) { + error!("Transaction rejected: {:?}", e); + } } fn block_received(&self, b: core::Block) { @@ -209,7 +218,8 @@ impl NetAdapter for NetToChainAdapter { impl NetToChainAdapter { pub fn new(chain_head: Arc>, chain_store: Arc, - chain_adapter: Arc, + chain_adapter: Arc, + tx_pool: Arc>>, peer_store: Arc) -> NetToChainAdapter { NetToChainAdapter { @@ -217,6 +227,7 @@ impl NetToChainAdapter { chain_store: chain_store, chain_adapter: chain_adapter, peer_store: peer_store, + tx_pool: tx_pool, syncer: OneTime::new(), } } @@ -231,23 +242,97 @@ impl NetToChainAdapter { } /// Implementation of the ChainAdapter for the network. Gets notified when the -/// blockchain accepted a new block and forwards it to the network for -/// broadcast. -pub struct ChainToNetAdapter { +/// blockchain accepted a new block, asking the pool to update its state and +/// the network to broadcast the block +pub struct ChainToPoolAndNetAdapter { + tx_pool: Arc>>, p2p: OneTime>, } -impl ChainAdapter for ChainToNetAdapter { +impl ChainAdapter for ChainToPoolAndNetAdapter { fn block_accepted(&self, b: &core::Block) { + { + if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) { + error!("Pool could not update itself at block {}: {:?}", + b.hash(), + e); + } + } self.p2p.borrow().broadcast_block(b); } } -impl ChainToNetAdapter { - pub fn new() -> ChainToNetAdapter { - ChainToNetAdapter { p2p: OneTime::new() } +impl ChainToPoolAndNetAdapter { + pub fn new(tx_pool: Arc>>) + -> ChainToPoolAndNetAdapter { + ChainToPoolAndNetAdapter { + tx_pool: tx_pool, + p2p: OneTime::new(), + } } pub fn init(&self, p2p: Arc) { self.p2p.init(p2p); } } + +/// Implements the view of the blockchain required by the TransactionPool to +/// operate. This is mostly getting information on unspent outputs in a +/// manner consistent with the chain state. +#[derive(Clone)] +pub struct PoolToChainAdapter { + chain_head: Arc>, + chain_store: Arc, +} + +macro_rules! none_err { + ($trying:expr) => {{ + let tried = $trying; + if let Err(_) = tried { + return None; + } + tried.unwrap() + }} +} + +impl PoolToChainAdapter { + /// Create a new pool adapter + pub fn new(chain_head: Arc>, + chain_store: Arc) + -> PoolToChainAdapter { + PoolToChainAdapter { + chain_head: chain_head, + chain_store: chain_store, + } + } +} + +impl pool::BlockChain for PoolToChainAdapter { + fn get_unspent(&self, output_ref: &Commitment) -> Option { + // TODO use an actual UTXO tree + // in the meantime doing it the *very* expensive way: + // 1. check the output exists + // 2. run the chain back from the head to check it hasn't been spent + if let Ok(out) = self.chain_store.get_output_by_commit(output_ref) { + let mut block_h: Hash; + { + let chain_head = self.chain_head.clone(); + let head = chain_head.lock().unwrap(); + block_h = head.last_block_h; + } + loop { + let b = none_err!(self.chain_store.get_block(&block_h)); + for input in b.inputs { + if input.commitment() == *output_ref { + return None; + } + } + if b.header.height == 1 { + return Some(out); + } else { + block_h = b.header.previous; + } + } + } + None + } +} diff --git a/grin/src/lib.rs b/grin/src/lib.rs index fb53bd584..7477c7369 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -40,6 +40,7 @@ extern crate grin_chain as chain; #[macro_use] extern crate grin_core as core; extern crate grin_p2p as p2p; +extern crate grin_pool as pool; extern crate grin_store as store; extern crate grin_util as util; extern crate secp256k1zkp as secp; diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 3c27b65de..157bb78aa 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -16,10 +16,10 @@ //! block and mine the block to produce a valid header with its proof-of-work. use rand::{self, Rng}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use time; -use adapters::ChainToNetAdapter; +use adapters::{ChainToPoolAndNetAdapter, PoolToChainAdapter}; use api; use core::consensus; use core::core; @@ -28,15 +28,20 @@ use core::pow::cuckoo; use core::ser; use chain; use secp; +use pool; use types::{MinerConfig, Error}; use util; +// Max number of transactions this miner will assemble in a block +const MAX_TX: u32 = 5000; + pub struct Miner { config: MinerConfig, chain_head: Arc>, chain_store: Arc, /// chain adapter to net - chain_adapter: Arc, + chain_adapter: Arc, + tx_pool: Arc>>, } impl Miner { @@ -45,13 +50,15 @@ impl Miner { pub fn new(config: MinerConfig, chain_head: Arc>, chain_store: Arc, - chain_adapter: Arc) + chain_adapter: Arc, + tx_pool: Arc>>) -> Miner { Miner { config: config, chain_head: chain_head, chain_store: chain_store, chain_adapter: chain_adapter, + tx_pool: tx_pool, } } @@ -59,7 +66,7 @@ impl Miner { /// chain anytime required and looking for PoW solution. pub fn run_loop(&self) { info!("Starting miner loop."); - let mut coinbase = self.get_coinbase(); + let mut coinbase = self.get_coinbase(); loop { // get the latest chain state and build a block on top of it let head: core::BlockHeader; @@ -77,7 +84,7 @@ impl Miner { debug!("Mining at Cuckoo{} for at most 2 secs on block {} at difficulty {}.", b.header.cuckoo_len, latest_hash, - b.header.difficulty); + b.header.difficulty); let mut iter_count = 0; while head.hash() == latest_hash && time::get_time().sec < deadline { let pow_hash = b.hash(); @@ -110,7 +117,7 @@ impl Miner { } else if let Ok(Some(tip)) = res { let chain_head = self.chain_head.clone(); let mut head = chain_head.lock().unwrap(); - coinbase = self.get_coinbase(); + coinbase = self.get_coinbase(); *head = tip; } } else { @@ -122,7 +129,10 @@ impl Miner { /// Builds a new block with the chain head as previous and eligible /// transactions from the pool. - fn build_block(&self, head: &core::BlockHeader, coinbase: (core::Output, core::TxKernel)) -> core::Block { + fn build_block(&self, + head: &core::BlockHeader, + coinbase: (core::Output, core::TxKernel)) + -> core::Block { let mut now_sec = time::get_time().sec; let head_sec = head.timestamp.to_timespec().sec; if now_sec == head_sec { @@ -131,9 +141,10 @@ impl Miner { let (difficulty, cuckoo_len) = consensus::next_target(now_sec, head_sec, head.difficulty.clone(), head.cuckoo_len); - // TODO populate inputs and outputs from pool transactions - let (output, kernel) = coinbase; - let mut b = core::Block::with_reward(head, vec![], output, kernel).unwrap(); + let txs_box = self.tx_pool.read().unwrap().prepare_mineable_transactions(MAX_TX); + let txs = txs_box.iter().map(|tx| tx.as_ref()).collect(); + let (output, kernel) = coinbase; + let mut b = core::Block::with_reward(head, txs, output, kernel).unwrap(); let mut rng = rand::OsRng::new().unwrap(); b.header.nonce = rng.gen(); @@ -145,21 +156,23 @@ impl Miner { fn get_coinbase(&self) -> (core::Output, core::TxKernel) { if self.config.burn_reward { - let mut rng = rand::OsRng::new().unwrap(); + let mut rng = rand::OsRng::new().unwrap(); let secp_inst = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let skey = secp::key::SecretKey::new(&secp_inst, &mut rng); - core::Block::reward_output(skey, &secp_inst).unwrap() - } else { - let url = format!("{}/v1/receive_coinbase", self.config.wallet_receiver_url.as_str()); - let res: CbData = api::client::post(url.as_str(), &CbAmount { amount: consensus::REWARD }) + core::Block::reward_output(skey, &secp_inst).unwrap() + } else { + let url = format!("{}/v1/receive_coinbase", + self.config.wallet_receiver_url.as_str()); + let res: CbData = api::client::post(url.as_str(), + &CbAmount { amount: consensus::REWARD }) .expect("Wallet receiver unreachable, could not claim reward. Is it running?"); - let out_bin = util::from_hex(res.output).unwrap(); - let kern_bin = util::from_hex(res.kernel).unwrap(); - let output = ser::deserialize(&mut &out_bin[..]).unwrap(); - let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap(); + let out_bin = util::from_hex(res.output).unwrap(); + let kern_bin = util::from_hex(res.kernel).unwrap(); + let output = ser::deserialize(&mut &out_bin[..]).unwrap(); + let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap(); - (output, kernel) - } + (output, kernel) + } } } diff --git a/grin/src/server.rs b/grin/src/server.rs index 59d28afb8..e03c6b08c 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -17,7 +17,7 @@ //! as a facade. use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time; @@ -25,7 +25,7 @@ use futures::{future, Future, Stream}; use tokio_core::reactor; use tokio_timer::Timer; -use adapters::{NetToChainAdapter, ChainToNetAdapter}; +use adapters::*; use api; use chain; use chain::ChainStore; @@ -33,6 +33,7 @@ use core; use core::core::hash::Hashed; use miner; use p2p; +use pool; use seed; use store; use sync; @@ -50,7 +51,9 @@ pub struct Server { chain_store: Arc, /// chain adapter to net, required for miner and anything that submits /// blocks - chain_adapter: Arc, + chain_adapter: Arc, + /// in-memory transaction pool + tx_pool: Arc>>, } impl Server { @@ -82,10 +85,15 @@ impl Server { let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); - let chain_adapter = Arc::new(ChainToNetAdapter::new()); + let pool_adapter = Arc::new(PoolToChainAdapter::new(shared_head.clone(), + chain_store.clone())); + let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(pool_adapter))); + + let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), chain_store.clone(), chain_adapter.clone(), + tx_pool.clone(), peer_store.clone())); let server = Arc::new(p2p::Server::new(config.capabilities, config.p2p_config, net_adapter.clone())); @@ -107,7 +115,9 @@ impl Server { evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); - api::start_rest_apis(config.api_http_addr.clone(), chain_store.clone()); + api::start_rest_apis(config.api_http_addr.clone(), + chain_store.clone(), + tx_pool.clone()); warn!("Grin server started."); Ok(Server { @@ -117,6 +127,7 @@ impl Server { chain_head: shared_head, chain_store: chain_store, chain_adapter: chain_adapter, + tx_pool: tx_pool, }) } @@ -137,7 +148,8 @@ impl Server { let miner = miner::Miner::new(config, self.chain_head.clone(), self.chain_store.clone(), - self.chain_adapter.clone()); + self.chain_adapter.clone(), + self.tx_pool.clone()); thread::spawn(move || { miner.run_loop(); }); diff --git a/pool/src/lib.rs b/pool/src/lib.rs index 634b0fa4f..e3478f9ee 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -35,4 +35,4 @@ extern crate grin_core as core; extern crate secp256k1zkp as secp; pub use pool::TransactionPool; -pub use types::{BlockChain, PoolError}; +pub use types::{BlockChain, TxSource, PoolError}; diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 841d49929..ae4254c69 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -43,7 +43,7 @@ pub struct TransactionPool { } impl TransactionPool where T: BlockChain { - fn new(chain: Arc) -> TransactionPool { + pub fn new(chain: Arc) -> TransactionPool { TransactionPool{ transactions: HashMap::new(), pool: Pool::empty(), @@ -380,8 +380,6 @@ impl TransactionPool where T: BlockChain { conflicting_txs.append(&mut conflicting_outputs); - println!("Conflicting txs: {:?}", conflicting_txs); - for txh in conflicting_txs { self.mark_transaction(txh, &mut marked_transactions); } @@ -436,7 +434,6 @@ impl TransactionPool where T: BlockChain { marked_transactions: HashMap,) ->Vec> { - println!("marked_txs: {:?}", marked_transactions); let mut removed_txs = Vec::new(); for tx_hash in marked_transactions.keys() { @@ -876,8 +873,6 @@ mod tests { } tx_elements.push(build::with_fee(fees as u64)); - println!("Fee was {}", fees as u64); - let (tx, _) = build::transaction(tx_elements).unwrap(); tx } diff --git a/store/src/lib.rs b/store/src/lib.rs index 887718597..2df5e4bbd 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -116,9 +116,9 @@ impl Store { /// extract only partial data. The underlying Readable size must align /// accordingly. Encapsulates serialization. pub fn get_ser_limited(&self, - key: &[u8], - len: usize) - -> Result, Error> { + key: &[u8], + len: usize) + -> Result, Error> { let data = try!(self.get(key)); match data { Some(val) => { @@ -187,6 +187,11 @@ impl<'a> Batch<'a> { } } + pub fn delete(mut self, key: &[u8]) -> Result, Error> { + self.batch.delete(key)?; + Ok(self) + } + /// Writes the batch to RocksDb. pub fn write(self) -> Result<(), Error> { self.store.write(self.batch) diff --git a/wallet/src/checker.rs b/wallet/src/checker.rs index 1154d3c3f..5535e0725 100644 --- a/wallet/src/checker.rs +++ b/wallet/src/checker.rs @@ -17,7 +17,6 @@ use api; use core::core::{Output, DEFAULT_OUTPUT, COINBASE_OUTPUT}; -use core::core::hash::Hashed; use secp::{self, pedersen}; use util;