diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 1fd7416ae..9d14f67dc 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -51,6 +51,13 @@ impl NetAdapter for NetToChainAdapter { debug_name: "p2p".to_string(), identifier: "?.?.?.?".to_string(), }; + debug!( + LOGGER, + "Received tx {} from {}, going to process.", + tx.hash(), + source.identifier, + ); + if let Err(e) = self.tx_pool.write().unwrap().add_to_memory_pool(source, tx) { error!(LOGGER, "Transaction rejected: {:?}", e); } @@ -303,6 +310,30 @@ impl ChainToPoolAndNetAdapter { } } +/// Adapter between the transaction pool and the network, to relay +/// transactions that have been accepted. +pub struct PoolToNetAdapter { + p2p: OneTime>, +} + +impl pool::PoolAdapter for PoolToNetAdapter { + fn tx_accepted(&self, tx: &core::Transaction) { + self.p2p.borrow().broadcast_transaction(tx); + } +} + +impl PoolToNetAdapter { + /// Create a new pool to net adapter + pub fn new() -> PoolToNetAdapter { + PoolToNetAdapter { p2p: OneTime::new() } + } + + /// Setup the p2p server on the adapter + pub fn init(&self, p2p: Arc) { + self.p2p.init(p2p); + } +} + /// Implements the view of the blockchain required by the TransactionPool to /// operate. Mostly needed to break any direct lifecycle or implementation /// dependency between the pool and the chain. diff --git a/grin/src/server.rs b/grin/src/server.rs index 0b708735f..3233535ab 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -81,9 +81,11 @@ impl Server { pub fn future(mut config: ServerConfig, evt_handle: &reactor::Handle) -> Result { let pool_adapter = Arc::new(PoolToChainAdapter::new()); + let pool_net_adapter = Arc::new(PoolToNetAdapter::new()); let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new( config.pool_config.clone(), pool_adapter.clone(), + pool_net_adapter.clone(), ))); let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); @@ -114,6 +116,7 @@ impl Server { net_adapter.clone(), )); chain_adapter.init(p2p_server.clone()); + pool_net_adapter.init(p2p_server.clone()); let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone()); match config.seeding_type.clone() { diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index a14cc15a7..893587d87 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -139,6 +139,13 @@ impl Peer { self.proto.send_block(b) } + /// Sends the provided transaction to the remote peer. The request may be + /// dropped if the remote peer is known to already have the transaction. + pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { + // TODO do not send if the peer sent us the tx in the first place + self.proto.send_transaction(tx) + } + pub fn send_header_request(&self, locator: Vec) -> Result<(), Error> { self.proto.send_header_request(locator) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index f39bf3e59..99b1d373e 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -270,6 +270,20 @@ impl Server { } } + /// Broadcasts the provided transaction to all our peers. A peer + /// implementation may drop the broadcast request if it knows the + /// remote peer already has the transaction. + pub fn broadcast_transaction(&self, tx: &core::Transaction) { + let peers = self.peers.write().unwrap(); + for p in peers.deref() { + if p.is_connected() { + if let Err(e) = p.send_transaction(tx) { + debug!(LOGGER, "Error sending block to peer: {:?}", e); + } + } + } + } + /// Number of peers we're currently connected to. pub fn peer_count(&self) -> u32 { self.peers.read().unwrap().len() as u32 diff --git a/pool/src/lib.rs b/pool/src/lib.rs index 0078ca4e1..6f3d1ac18 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -37,4 +37,4 @@ extern crate grin_keychain as keychain; extern crate secp256k1zkp as secp; pub use pool::TransactionPool; -pub use types::{BlockChain, TxSource, PoolError, PoolConfig}; +pub use types::{BlockChain, PoolAdapter, TxSource, PoolError, PoolConfig}; diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 2c069f48c..bed658ccb 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -43,6 +43,7 @@ pub struct TransactionPool { // blockchain is a DummyChain, for now, which mimics what the future // chain will offer to the pool blockchain: Arc, + adapter: Arc, } impl TransactionPool @@ -50,13 +51,14 @@ where T: BlockChain, { /// Create a new transaction pool - pub fn new(config: PoolConfig, chain: Arc) -> TransactionPool { + pub fn new(config: PoolConfig, chain: Arc, adapter: Arc) -> TransactionPool { TransactionPool { config: config, transactions: HashMap::new(), pool: Pool::empty(), orphans: Orphans::empty(), blockchain: chain, + adapter: adapter, } } @@ -241,6 +243,7 @@ where ); self.reconcile_orphans().unwrap(); + self.adapter.tx_accepted(&tx); self.transactions.insert(tx_hash, Box::new(tx)); Ok(()) @@ -1196,6 +1199,7 @@ mod tests { pool: Pool::empty(), orphans: Orphans::empty(), blockchain: dummy_chain.clone(), + adapter: Arc::new(NoopAdapter{}), } } diff --git a/pool/src/types.rs b/pool/src/types.rs index 1de3a666d..5b2798831 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -163,6 +163,21 @@ pub trait BlockChain { fn head_header(&self) -> Result; } +/// Bridge between the transaction pool and the rest of the system. Handles +/// downstream processing of valid transactions by the rest of the system, most +/// importantly the broadcasting of transactions to our peers. +pub trait PoolAdapter: Send + Sync { + /// The transaction pool has accepted this transactions as valid and added + /// it to its internal cache. + fn tx_accepted(&self, tx: &transaction::Transaction); +} + +/// Dummy adapter used as a placeholder for real implementations +pub struct NoopAdapter {} +impl PoolAdapter for NoopAdapter { + fn tx_accepted(&self, _: &transaction::Transaction) {} +} + /// Pool contains the elements of the graph that are connected, in full, to /// the blockchain. /// Reservations of outputs by orphan transactions (not fully connected) are