Transaction broadcast (#209)

* Add transaction broadcast to all known peers once they have been
accepted by our own transaction pool.
* Some debug log

Fixes #200
This commit is contained in:
Ignotus Peverell 2017-10-25 21:06:24 +00:00 committed by GitHub
parent 7178b400b8
commit e2e24bc38e
7 changed files with 76 additions and 2 deletions

View file

@ -51,6 +51,13 @@ impl NetAdapter for NetToChainAdapter {
debug_name: "p2p".to_string(), debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(), identifier: "?.?.?.?".to_string(),
}; };
debug!(
LOGGER,
"Received tx {} from {}, going to process.",
tx.hash(),
source.identifier,
);
if let Err(e) = self.tx_pool.write().unwrap().add_to_memory_pool(source, tx) { if let Err(e) = self.tx_pool.write().unwrap().add_to_memory_pool(source, tx) {
error!(LOGGER, "Transaction rejected: {:?}", e); error!(LOGGER, "Transaction rejected: {:?}", e);
} }
@ -303,6 +310,30 @@ impl ChainToPoolAndNetAdapter {
} }
} }
/// Adapter between the transaction pool and the network, to relay
/// transactions that have been accepted.
pub struct PoolToNetAdapter {
p2p: OneTime<Arc<Server>>,
}
impl pool::PoolAdapter for PoolToNetAdapter {
fn tx_accepted(&self, tx: &core::Transaction) {
self.p2p.borrow().broadcast_transaction(tx);
}
}
impl PoolToNetAdapter {
/// Create a new pool to net adapter
pub fn new() -> PoolToNetAdapter {
PoolToNetAdapter { p2p: OneTime::new() }
}
/// Setup the p2p server on the adapter
pub fn init(&self, p2p: Arc<Server>) {
self.p2p.init(p2p);
}
}
/// Implements the view of the blockchain required by the TransactionPool to /// Implements the view of the blockchain required by the TransactionPool to
/// operate. Mostly needed to break any direct lifecycle or implementation /// operate. Mostly needed to break any direct lifecycle or implementation
/// dependency between the pool and the chain. /// dependency between the pool and the chain.

View file

@ -81,9 +81,11 @@ impl Server {
pub fn future(mut config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> { pub fn future(mut config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> {
let pool_adapter = Arc::new(PoolToChainAdapter::new()); let pool_adapter = Arc::new(PoolToChainAdapter::new());
let pool_net_adapter = Arc::new(PoolToNetAdapter::new());
let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new( let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(
config.pool_config.clone(), config.pool_config.clone(),
pool_adapter.clone(), pool_adapter.clone(),
pool_net_adapter.clone(),
))); )));
let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone()));
@ -114,6 +116,7 @@ impl Server {
net_adapter.clone(), net_adapter.clone(),
)); ));
chain_adapter.init(p2p_server.clone()); chain_adapter.init(p2p_server.clone());
pool_net_adapter.init(p2p_server.clone());
let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone()); let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone());
match config.seeding_type.clone() { match config.seeding_type.clone() {

View file

@ -139,6 +139,13 @@ impl Peer {
self.proto.send_block(b) self.proto.send_block(b)
} }
/// Sends the provided transaction to the remote peer. The request may be
/// dropped if the remote peer is known to already have the transaction.
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
// TODO do not send if the peer sent us the tx in the first place
self.proto.send_transaction(tx)
}
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> { pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.proto.send_header_request(locator) self.proto.send_header_request(locator)
} }

View file

@ -270,6 +270,20 @@ impl Server {
} }
} }
/// Broadcasts the provided transaction to all our peers. A peer
/// implementation may drop the broadcast request if it knows the
/// remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let peers = self.peers.write().unwrap();
for p in peers.deref() {
if p.is_connected() {
if let Err(e) = p.send_transaction(tx) {
debug!(LOGGER, "Error sending block to peer: {:?}", e);
}
}
}
}
/// Number of peers we're currently connected to. /// Number of peers we're currently connected to.
pub fn peer_count(&self) -> u32 { pub fn peer_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32 self.peers.read().unwrap().len() as u32

View file

@ -37,4 +37,4 @@ extern crate grin_keychain as keychain;
extern crate secp256k1zkp as secp; extern crate secp256k1zkp as secp;
pub use pool::TransactionPool; pub use pool::TransactionPool;
pub use types::{BlockChain, TxSource, PoolError, PoolConfig}; pub use types::{BlockChain, PoolAdapter, TxSource, PoolError, PoolConfig};

View file

@ -43,6 +43,7 @@ pub struct TransactionPool<T> {
// blockchain is a DummyChain, for now, which mimics what the future // blockchain is a DummyChain, for now, which mimics what the future
// chain will offer to the pool // chain will offer to the pool
blockchain: Arc<T>, blockchain: Arc<T>,
adapter: Arc<PoolAdapter>,
} }
impl<T> TransactionPool<T> impl<T> TransactionPool<T>
@ -50,13 +51,14 @@ where
T: BlockChain, T: BlockChain,
{ {
/// Create a new transaction pool /// Create a new transaction pool
pub fn new(config: PoolConfig, chain: Arc<T>) -> TransactionPool<T> { pub fn new(config: PoolConfig, chain: Arc<T>, adapter: Arc<PoolAdapter>) -> TransactionPool<T> {
TransactionPool { TransactionPool {
config: config, config: config,
transactions: HashMap::new(), transactions: HashMap::new(),
pool: Pool::empty(), pool: Pool::empty(),
orphans: Orphans::empty(), orphans: Orphans::empty(),
blockchain: chain, blockchain: chain,
adapter: adapter,
} }
} }
@ -241,6 +243,7 @@ where
); );
self.reconcile_orphans().unwrap(); self.reconcile_orphans().unwrap();
self.adapter.tx_accepted(&tx);
self.transactions.insert(tx_hash, Box::new(tx)); self.transactions.insert(tx_hash, Box::new(tx));
Ok(()) Ok(())
@ -1196,6 +1199,7 @@ mod tests {
pool: Pool::empty(), pool: Pool::empty(),
orphans: Orphans::empty(), orphans: Orphans::empty(),
blockchain: dummy_chain.clone(), blockchain: dummy_chain.clone(),
adapter: Arc::new(NoopAdapter{}),
} }
} }

View file

@ -163,6 +163,21 @@ pub trait BlockChain {
fn head_header(&self) -> Result<block::BlockHeader, PoolError>; fn head_header(&self) -> Result<block::BlockHeader, PoolError>;
} }
/// Bridge between the transaction pool and the rest of the system. Handles
/// downstream processing of valid transactions by the rest of the system, most
/// importantly the broadcasting of transactions to our peers.
pub trait PoolAdapter: Send + Sync {
/// The transaction pool has accepted this transactions as valid and added
/// it to its internal cache.
fn tx_accepted(&self, tx: &transaction::Transaction);
}
/// Dummy adapter used as a placeholder for real implementations
pub struct NoopAdapter {}
impl PoolAdapter for NoopAdapter {
fn tx_accepted(&self, _: &transaction::Transaction) {}
}
/// Pool contains the elements of the graph that are connected, in full, to /// Pool contains the elements of the graph that are connected, in full, to
/// the blockchain. /// the blockchain.
/// Reservations of outputs by orphan transactions (not fully connected) are /// Reservations of outputs by orphan transactions (not fully connected) are