Integrate transaction pool with rest of the system

* Transactions coming from the network are now pushed to the pool
through the net adapter.
* New blocks accepted by the chain are sent to the pool for
eviction.
* The miner requests transactions from the pool to build its
blocks.
* The push API adds to the pool, removing the mock.
* Implementation of the adapter to the chain required by the pool
to get consistent UTXOs. Grossly unoptimized until we have the UTXO
MMR ready.
This commit is contained in:
Ignotus Peverell 2017-06-10 12:51:33 -07:00
parent 172c5e840b
commit eb9cc7ef13
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
13 changed files with 207 additions and 63 deletions

View file

@ -7,6 +7,7 @@ workspace = ".."
[dependencies]
grin_core = { path = "../core" }
grin_chain = { path = "../chain" }
grin_pool = { path = "../pool" }
grin_util = { path = "../util" }
secp256k1zkp = { path = "../secp256k1zkp" }

View file

@ -21,13 +21,14 @@
// }
// }
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread;
use core::core::{Transaction, Output};
use core::core::hash::Hash;
use core::ser;
use chain::{self, Tip};
use pool;
use rest::*;
use secp::pedersen::Commitment;
use util;
@ -85,17 +86,36 @@ impl ApiEndpoint for OutputApi {
/// ApiEndpoint implementation for the transaction pool, to check its status
/// and size as well as push new transactions.
#[derive(Clone)]
pub struct PoolApi {
pub struct PoolApi<T> {
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>,
}
impl ApiEndpoint for PoolApi {
#[derive(Serialize, Deserialize)]
struct PoolInfo {
pool_size: usize,
orphans_size: usize,
total_size: usize,
}
impl<T> ApiEndpoint for PoolApi<T>
where T: pool::BlockChain + Clone + Send + Sync + 'static
{
type ID = String;
type T = ();
type T = PoolInfo;
type OP_IN = TxWrapper;
type OP_OUT = ();
fn operations(&self) -> Vec<Operation> {
vec![Operation::Custom("push".to_string())]
vec![Operation::Get, Operation::Custom("push".to_string())]
}
fn get(&self, id: String) -> ApiResult<PoolInfo> {
let pool = self.tx_pool.read().unwrap();
Ok(PoolInfo {
pool_size: pool.pool_size(),
orphans_size: pool.orphans_size(),
total_size: pool.total_size(),
})
}
fn operation(&self, op: String, input: TxWrapper) -> ApiResult<()> {
@ -106,8 +126,15 @@ impl ApiEndpoint for PoolApi {
Error::Argument("Could not deserialize transaction, invalid format.".to_string())
})?;
println!("Fake push of transaction:");
println!("{:?}", tx);
let source = pool::TxSource {
debug_name: "push-api".to_string(),
identifier: "?.?.?.?".to_string(),
};
self.tx_pool
.write()
.unwrap()
.add_to_memory_pool(source, tx)
.map_err(|e| Error::Internal(format!("Addition to transaction pool failed: {:?}", e)))?;
Ok(())
}
}
@ -120,7 +147,11 @@ struct TxWrapper {
/// Start all server REST APIs. Just register all of them on a ApiServer
/// instance and runs the corresponding HTTP server.
pub fn start_rest_apis(addr: String, chain_store: Arc<chain::ChainStore>) {
pub fn start_rest_apis<T>(addr: String,
chain_store: Arc<chain::ChainStore>,
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>)
where T: pool::BlockChain + Clone + Send + Sync + 'static
{
thread::spawn(move || {
let mut apis = ApiServer::new("/v1".to_string());
@ -128,7 +159,7 @@ pub fn start_rest_apis(addr: String, chain_store: Arc<chain::ChainStore>) {
ChainApi { chain_store: chain_store.clone() });
apis.register_endpoint("/chain/output".to_string(),
OutputApi { chain_store: chain_store.clone() });
apis.register_endpoint("/pool".to_string(), PoolApi {});
apis.register_endpoint("/pool".to_string(), PoolApi { tx_pool: tx_pool });
apis.start(&addr[..]).unwrap_or_else(|e| {
error!("Failed to start API HTTP server: {}.", e);

View file

@ -14,6 +14,7 @@
extern crate grin_core as core;
extern crate grin_chain as chain;
extern crate grin_pool as pool;
extern crate grin_util as util;
extern crate secp256k1zkp as secp;

View file

@ -230,7 +230,7 @@ impl Block {
/// transactions and the private key that will receive the reward. Checks
/// that all transactions are valid and calculates the Merkle tree.
pub fn new(prev: &BlockHeader,
txs: Vec<&mut Transaction>,
txs: Vec<&Transaction>,
reward_key: SecretKey)
-> Result<Block, secp::Error> {
@ -244,7 +244,7 @@ impl Block {
/// a vector of transactions and the reward information. Checks
/// that all transactions are valid and calculates the Merkle tree.
pub fn with_reward(prev: &BlockHeader,
txs: Vec<&mut Transaction>,
txs: Vec<&Transaction>,
reward_out: Output,
reward_kern: TxKernel)
-> Result<Block, secp::Error> {
@ -493,7 +493,7 @@ mod test {
// utility to create a block without worrying about the key or previous
// header
fn new_block(txs: Vec<&mut Transaction>, secp: &Secp256k1) -> Block {
fn new_block(txs: Vec<&Transaction>, secp: &Secp256k1) -> Block {
let mut rng = OsRng::new().unwrap();
let skey = SecretKey::new(secp, &mut rng);
Block::new(&BlockHeader::default(), txs, skey).unwrap()

View file

@ -10,6 +10,7 @@ grin_chain = { path = "../chain" }
grin_core = { path = "../core" }
grin_store = { path = "../store" }
grin_p2p = { path = "../p2p" }
grin_pool = { path = "../pool" }
grin_util = { path = "../util" }
secp256k1zkp = { path = "../secp256k1zkp" }

View file

@ -14,14 +14,16 @@
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use chain::{self, ChainAdapter};
use core::core;
use core::core::{self, Output};
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use p2p::{self, NetAdapter, Server, PeerStore, PeerData, Capabilities, State};
use pool;
use secp::pedersen::Commitment;
use util::OneTime;
use store;
use sync;
@ -33,8 +35,9 @@ pub struct NetToChainAdapter {
/// the reference copy of the current chain state
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>,
chain_adapter: Arc<ChainToPoolAndNetAdapter>,
peer_store: Arc<PeerStore>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
syncer: OneTime<Arc<sync::Syncer>>,
}
@ -45,7 +48,13 @@ impl NetAdapter for NetToChainAdapter {
}
fn transaction_received(&self, tx: core::Transaction) {
unimplemented!();
let source = pool::TxSource {
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
};
if let Err(e) = self.tx_pool.write().unwrap().add_to_memory_pool(source, tx) {
error!("Transaction rejected: {:?}", e);
}
}
fn block_received(&self, b: core::Block) {
@ -209,7 +218,8 @@ impl NetAdapter for NetToChainAdapter {
impl NetToChainAdapter {
pub fn new(chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>,
chain_adapter: Arc<ChainToPoolAndNetAdapter>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peer_store: Arc<PeerStore>)
-> NetToChainAdapter {
NetToChainAdapter {
@ -217,6 +227,7 @@ impl NetToChainAdapter {
chain_store: chain_store,
chain_adapter: chain_adapter,
peer_store: peer_store,
tx_pool: tx_pool,
syncer: OneTime::new(),
}
}
@ -231,23 +242,97 @@ impl NetToChainAdapter {
}
/// Implementation of the ChainAdapter for the network. Gets notified when the
/// blockchain accepted a new block and forwards it to the network for
/// broadcast.
pub struct ChainToNetAdapter {
/// blockchain accepted a new block, asking the pool to update its state and
/// the network to broadcast the block
pub struct ChainToPoolAndNetAdapter {
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
p2p: OneTime<Arc<Server>>,
}
impl ChainAdapter for ChainToNetAdapter {
impl ChainAdapter for ChainToPoolAndNetAdapter {
fn block_accepted(&self, b: &core::Block) {
{
if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) {
error!("Pool could not update itself at block {}: {:?}",
b.hash(),
e);
}
}
self.p2p.borrow().broadcast_block(b);
}
}
impl ChainToNetAdapter {
pub fn new() -> ChainToNetAdapter {
ChainToNetAdapter { p2p: OneTime::new() }
impl ChainToPoolAndNetAdapter {
pub fn new(tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>)
-> ChainToPoolAndNetAdapter {
ChainToPoolAndNetAdapter {
tx_pool: tx_pool,
p2p: OneTime::new(),
}
}
pub fn init(&self, p2p: Arc<Server>) {
self.p2p.init(p2p);
}
}
/// Implements the view of the blockchain required by the TransactionPool to
/// operate. This is mostly getting information on unspent outputs in a
/// manner consistent with the chain state.
#[derive(Clone)]
pub struct PoolToChainAdapter {
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
}
macro_rules! none_err {
($trying:expr) => {{
let tried = $trying;
if let Err(_) = tried {
return None;
}
tried.unwrap()
}}
}
impl PoolToChainAdapter {
/// Create a new pool adapter
pub fn new(chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>)
-> PoolToChainAdapter {
PoolToChainAdapter {
chain_head: chain_head,
chain_store: chain_store,
}
}
}
impl pool::BlockChain for PoolToChainAdapter {
fn get_unspent(&self, output_ref: &Commitment) -> Option<Output> {
// TODO use an actual UTXO tree
// in the meantime doing it the *very* expensive way:
// 1. check the output exists
// 2. run the chain back from the head to check it hasn't been spent
if let Ok(out) = self.chain_store.get_output_by_commit(output_ref) {
let mut block_h: Hash;
{
let chain_head = self.chain_head.clone();
let head = chain_head.lock().unwrap();
block_h = head.last_block_h;
}
loop {
let b = none_err!(self.chain_store.get_block(&block_h));
for input in b.inputs {
if input.commitment() == *output_ref {
return None;
}
}
if b.header.height == 1 {
return Some(out);
} else {
block_h = b.header.previous;
}
}
}
None
}
}

View file

@ -40,6 +40,7 @@ extern crate grin_chain as chain;
#[macro_use]
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate grin_pool as pool;
extern crate grin_store as store;
extern crate grin_util as util;
extern crate secp256k1zkp as secp;

View file

@ -16,10 +16,10 @@
//! block and mine the block to produce a valid header with its proof-of-work.
use rand::{self, Rng};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use time;
use adapters::ChainToNetAdapter;
use adapters::{ChainToPoolAndNetAdapter, PoolToChainAdapter};
use api;
use core::consensus;
use core::core;
@ -28,15 +28,20 @@ use core::pow::cuckoo;
use core::ser;
use chain;
use secp;
use pool;
use types::{MinerConfig, Error};
use util;
// Max number of transactions this miner will assemble in a block
const MAX_TX: u32 = 5000;
pub struct Miner {
config: MinerConfig,
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
/// chain adapter to net
chain_adapter: Arc<ChainToNetAdapter>,
chain_adapter: Arc<ChainToPoolAndNetAdapter>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
}
impl Miner {
@ -45,13 +50,15 @@ impl Miner {
pub fn new(config: MinerConfig,
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>)
chain_adapter: Arc<ChainToPoolAndNetAdapter>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>)
-> Miner {
Miner {
config: config,
chain_head: chain_head,
chain_store: chain_store,
chain_adapter: chain_adapter,
tx_pool: tx_pool,
}
}
@ -59,7 +66,7 @@ impl Miner {
/// chain anytime required and looking for PoW solution.
pub fn run_loop(&self) {
info!("Starting miner loop.");
let mut coinbase = self.get_coinbase();
let mut coinbase = self.get_coinbase();
loop {
// get the latest chain state and build a block on top of it
let head: core::BlockHeader;
@ -77,7 +84,7 @@ impl Miner {
debug!("Mining at Cuckoo{} for at most 2 secs on block {} at difficulty {}.",
b.header.cuckoo_len,
latest_hash,
b.header.difficulty);
b.header.difficulty);
let mut iter_count = 0;
while head.hash() == latest_hash && time::get_time().sec < deadline {
let pow_hash = b.hash();
@ -110,7 +117,7 @@ impl Miner {
} else if let Ok(Some(tip)) = res {
let chain_head = self.chain_head.clone();
let mut head = chain_head.lock().unwrap();
coinbase = self.get_coinbase();
coinbase = self.get_coinbase();
*head = tip;
}
} else {
@ -122,7 +129,10 @@ impl Miner {
/// Builds a new block with the chain head as previous and eligible
/// transactions from the pool.
fn build_block(&self, head: &core::BlockHeader, coinbase: (core::Output, core::TxKernel)) -> core::Block {
fn build_block(&self,
head: &core::BlockHeader,
coinbase: (core::Output, core::TxKernel))
-> core::Block {
let mut now_sec = time::get_time().sec;
let head_sec = head.timestamp.to_timespec().sec;
if now_sec == head_sec {
@ -131,9 +141,10 @@ impl Miner {
let (difficulty, cuckoo_len) =
consensus::next_target(now_sec, head_sec, head.difficulty.clone(), head.cuckoo_len);
// TODO populate inputs and outputs from pool transactions
let (output, kernel) = coinbase;
let mut b = core::Block::with_reward(head, vec![], output, kernel).unwrap();
let txs_box = self.tx_pool.read().unwrap().prepare_mineable_transactions(MAX_TX);
let txs = txs_box.iter().map(|tx| tx.as_ref()).collect();
let (output, kernel) = coinbase;
let mut b = core::Block::with_reward(head, txs, output, kernel).unwrap();
let mut rng = rand::OsRng::new().unwrap();
b.header.nonce = rng.gen();
@ -145,21 +156,23 @@ impl Miner {
fn get_coinbase(&self) -> (core::Output, core::TxKernel) {
if self.config.burn_reward {
let mut rng = rand::OsRng::new().unwrap();
let mut rng = rand::OsRng::new().unwrap();
let secp_inst = secp::Secp256k1::with_caps(secp::ContextFlag::Commit);
let skey = secp::key::SecretKey::new(&secp_inst, &mut rng);
core::Block::reward_output(skey, &secp_inst).unwrap()
} else {
let url = format!("{}/v1/receive_coinbase", self.config.wallet_receiver_url.as_str());
let res: CbData = api::client::post(url.as_str(), &CbAmount { amount: consensus::REWARD })
core::Block::reward_output(skey, &secp_inst).unwrap()
} else {
let url = format!("{}/v1/receive_coinbase",
self.config.wallet_receiver_url.as_str());
let res: CbData = api::client::post(url.as_str(),
&CbAmount { amount: consensus::REWARD })
.expect("Wallet receiver unreachable, could not claim reward. Is it running?");
let out_bin = util::from_hex(res.output).unwrap();
let kern_bin = util::from_hex(res.kernel).unwrap();
let output = ser::deserialize(&mut &out_bin[..]).unwrap();
let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap();
let out_bin = util::from_hex(res.output).unwrap();
let kern_bin = util::from_hex(res.kernel).unwrap();
let output = ser::deserialize(&mut &out_bin[..]).unwrap();
let kernel = ser::deserialize(&mut &kern_bin[..]).unwrap();
(output, kernel)
}
(output, kernel)
}
}
}

View file

@ -17,7 +17,7 @@
//! as a facade.
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time;
@ -25,7 +25,7 @@ use futures::{future, Future, Stream};
use tokio_core::reactor;
use tokio_timer::Timer;
use adapters::{NetToChainAdapter, ChainToNetAdapter};
use adapters::*;
use api;
use chain;
use chain::ChainStore;
@ -33,6 +33,7 @@ use core;
use core::core::hash::Hashed;
use miner;
use p2p;
use pool;
use seed;
use store;
use sync;
@ -50,7 +51,9 @@ pub struct Server {
chain_store: Arc<chain::ChainStore>,
/// chain adapter to net, required for miner and anything that submits
/// blocks
chain_adapter: Arc<ChainToNetAdapter>,
chain_adapter: Arc<ChainToPoolAndNetAdapter>,
/// in-memory transaction pool
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
}
impl Server {
@ -82,10 +85,15 @@ impl Server {
let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?);
let chain_adapter = Arc::new(ChainToNetAdapter::new());
let pool_adapter = Arc::new(PoolToChainAdapter::new(shared_head.clone(),
chain_store.clone()));
let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(pool_adapter)));
let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone()));
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone(),
tx_pool.clone(),
peer_store.clone()));
let server =
Arc::new(p2p::Server::new(config.capabilities, config.p2p_config, net_adapter.clone()));
@ -107,7 +115,9 @@ impl Server {
evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ()));
api::start_rest_apis(config.api_http_addr.clone(), chain_store.clone());
api::start_rest_apis(config.api_http_addr.clone(),
chain_store.clone(),
tx_pool.clone());
warn!("Grin server started.");
Ok(Server {
@ -117,6 +127,7 @@ impl Server {
chain_head: shared_head,
chain_store: chain_store,
chain_adapter: chain_adapter,
tx_pool: tx_pool,
})
}
@ -137,7 +148,8 @@ impl Server {
let miner = miner::Miner::new(config,
self.chain_head.clone(),
self.chain_store.clone(),
self.chain_adapter.clone());
self.chain_adapter.clone(),
self.tx_pool.clone());
thread::spawn(move || {
miner.run_loop();
});

View file

@ -35,4 +35,4 @@ extern crate grin_core as core;
extern crate secp256k1zkp as secp;
pub use pool::TransactionPool;
pub use types::{BlockChain, PoolError};
pub use types::{BlockChain, TxSource, PoolError};

View file

@ -43,7 +43,7 @@ pub struct TransactionPool<T> {
}
impl<T> TransactionPool<T> where T: BlockChain {
fn new(chain: Arc<T>) -> TransactionPool<T> {
pub fn new(chain: Arc<T>) -> TransactionPool<T> {
TransactionPool{
transactions: HashMap::new(),
pool: Pool::empty(),
@ -380,8 +380,6 @@ impl<T> TransactionPool<T> where T: BlockChain {
conflicting_txs.append(&mut conflicting_outputs);
println!("Conflicting txs: {:?}", conflicting_txs);
for txh in conflicting_txs {
self.mark_transaction(txh, &mut marked_transactions);
}
@ -436,7 +434,6 @@ impl<T> TransactionPool<T> where T: BlockChain {
marked_transactions: HashMap<hash::Hash, ()>,)
->Vec<Box<transaction::Transaction>> {
println!("marked_txs: {:?}", marked_transactions);
let mut removed_txs = Vec::new();
for tx_hash in marked_transactions.keys() {
@ -876,8 +873,6 @@ mod tests {
}
tx_elements.push(build::with_fee(fees as u64));
println!("Fee was {}", fees as u64);
let (tx, _) = build::transaction(tx_elements).unwrap();
tx
}

View file

@ -116,9 +116,9 @@ impl Store {
/// extract only partial data. The underlying Readable size must align
/// accordingly. Encapsulates serialization.
pub fn get_ser_limited<T: ser::Readable>(&self,
key: &[u8],
len: usize)
-> Result<Option<T>, Error> {
key: &[u8],
len: usize)
-> Result<Option<T>, Error> {
let data = try!(self.get(key));
match data {
Some(val) => {
@ -187,6 +187,11 @@ impl<'a> Batch<'a> {
}
}
pub fn delete(mut self, key: &[u8]) -> Result<Batch<'a>, Error> {
self.batch.delete(key)?;
Ok(self)
}
/// Writes the batch to RocksDb.
pub fn write(self) -> Result<(), Error> {
self.store.write(self.batch)

View file

@ -17,7 +17,6 @@
use api;
use core::core::{Output, DEFAULT_OUTPUT, COINBASE_OUTPUT};
use core::core::hash::Hashed;
use secp::{self, pedersen};
use util;