// Copyright 2018 The Grin Developers // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Adapters connecting new block, new transaction, and accepted transaction //! events to consumers of those events. use rand; use rand::Rng; use std::fs::File; use std::net::SocketAddr; use std::ops::Deref; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock, Weak}; use std::thread; use std::time::Instant; use chain::{self, ChainAdapter, Options, Tip}; use common::types::{ChainValidationMode, ServerConfig}; use core::core; use core::core::block::BlockHeader; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use core::core::transaction::Transaction; use p2p; use pool; use store; use util::LOGGER; use util::OneTime; // All adapters use `Weak` references instead of `Arc` to avoid cycles that // can never be destroyed. These 2 functions are simple helpers to reduce the // boilerplate of dealing with `Weak`. fn w(weak: &Weak) -> Arc { weak.upgrade().unwrap() } fn wo(weak_one: &OneTime>) -> Arc { w(weak_one.borrow().deref()) } /// Implementation of the NetAdapter for the blockchain. Gets notified when new /// blocks and transactions are received and forwards to the chain and pool /// implementations. pub struct NetToChainAdapter { currently_syncing: Arc, archive_mode: bool, chain: Weak, tx_pool: Arc>>, peers: OneTime>, config: ServerConfig, } impl p2p::ChainAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { w(&self.chain).total_difficulty() } fn total_height(&self) -> u64 { w(&self.chain).head().unwrap().height } fn transaction_received(&self, tx: core::Transaction, stem: bool) { let source = pool::TxSource { debug_name: "p2p".to_string(), identifier: "?.?.?.?".to_string(), }; debug!( LOGGER, "Received tx {} from {:?}, going to process.", tx.hash(), source, ); let h = tx.hash(); let res = { let mut tx_pool = self.tx_pool.write().unwrap(); tx_pool.add_to_pool(source, tx, stem) }; if let Err(e) = res { debug!(LOGGER, "Transaction {} rejected: {:?}", h, e); } } fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool { debug!( LOGGER, "Received block {} at {} from {}, going to process.", b.hash(), b.header.height, addr, ); self.process_block(b, addr) } fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool { let bhash = cb.hash(); debug!( LOGGER, "Received compact_block {} at {} from {}, going to process.", bhash, cb.header.height, addr, ); if cb.kern_ids.is_empty() { let block = core::Block::hydrate_from(cb, vec![]); // push the freshly hydrated block through the chain pipeline self.process_block(block, addr) } else { // TODO - do we need to validate the header here? let txs = { let tx_pool = self.tx_pool.read().unwrap(); tx_pool.retrieve_transactions(&cb) }; debug!(LOGGER, "adapter: txs from tx pool - {}", txs.len(),); // TODO - 3 scenarios here - // 1) we hydrate a valid block (good to go) // 2) we hydrate an invalid block (txs legit missing from our pool) // 3) we hydrate an invalid block (peer sent us a "bad" compact block) - [TBD] let block = core::Block::hydrate_from(cb.clone(), txs); let chain = self.chain .upgrade() .expect("failed to upgrade weak ref to chain"); if let Ok(sums) = chain.get_block_sums(&cb.header.previous) { if block.validate(&sums.output_sum, &sums.kernel_sum).is_ok() { debug!(LOGGER, "adapter: successfully hydrated block from tx pool!"); self.process_block(block, addr) } else { debug!( LOGGER, "adapter: block invalid after hydration, requesting full block" ); self.request_block(&cb.header, &addr); true } } else { debug!( LOGGER, "adapter: failed to retrieve previous block header (still syncing?)" ); true } } } fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool { let bhash = bh.hash(); debug!( LOGGER, "Received block header {} at {} from {}, going to process.", bhash, bh.height, addr, ); // pushing the new block header through the header chain pipeline // we will go ask for the block if this is a new header let res = w(&self.chain).process_block_header(&bh, self.chain_opts()); if let &Err(ref e) = &res { debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e); if e.is_bad_data() { debug!( LOGGER, "header_received: {} is a bad header, resetting header head", bhash ); let _ = w(&self.chain).reset_head(); return false; } else { // we got an error when trying to process the block header // but nothing serious enough to need to ban the peer upstream return true; } } // we have successfully processed a block header // so we can go request the block itself self.request_compact_block(&bh, &addr); // done receiving the header true } fn headers_received(&self, bhs: Vec, addr: SocketAddr) { info!( LOGGER, "Received block headers {:?} from {}", bhs.iter().map(|x| x.hash()).collect::>(), addr, ); // try to add each header to our header chain let mut added_hs = vec![]; for bh in bhs { let res = w(&self.chain).sync_block_header(&bh, self.chain_opts()); match res { Ok(_) => { added_hs.push(bh.hash()); } Err(chain::Error::Unfit(s)) => { info!( LOGGER, "Received unfit block header {} at {}: {}.", bh.hash(), bh.height, s ); } Err(chain::Error::StoreErr(e, explanation)) => { error!( LOGGER, "Store error processing block header {}: in {} {:?}", bh.hash(), explanation, e ); return; } Err(e) => { info!(LOGGER, "Invalid block header {}: {:?}.", bh.hash(), e); // TODO penalize peer somehow } } } let header_head = w(&self.chain).get_header_head().unwrap(); info!( LOGGER, "Added {} headers to the header chain. Last: {} at {}.", added_hs.len(), header_head.last_block_h, header_head.height, ); } fn locate_headers(&self, locator: Vec) -> Vec { debug!(LOGGER, "locate_headers: {:?}", locator,); let header = match self.find_common_header(locator) { Some(header) => header, None => return vec![], }; debug!(LOGGER, "locate_headers: common header: {:?}", header.hash(),); // looks like we know one, getting as many following headers as allowed let hh = header.height; let mut headers = vec![]; for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) { let header = w(&self.chain).get_header_by_height(h); match header { Ok(head) => headers.push(head), Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => break, Err(e) => { error!(LOGGER, "Could not build header locator: {:?}", e); return vec![]; } } } debug!( LOGGER, "locate_headers: returning headers: {}", headers.len(), ); headers } /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option { let b = w(&self.chain).get_block(&h); match b { Ok(b) => Some(b), _ => None, } } /// Provides a reading view into the current txhashset state as well as /// the required indexes for a consumer to rewind to a consistant state /// at the provided block hash. fn txhashset_read(&self, h: Hash) -> Option { match w(&self.chain).txhashset_read(h.clone()) { Ok((out_index, kernel_index, read)) => Some(p2p::TxHashSetRead { output_index: out_index, kernel_index: kernel_index, reader: read, }), Err(e) => { warn!( LOGGER, "Couldn't produce txhashset data for block {}: {:?}", h, e ); None } } } /// Writes a reading view on a txhashset state that's been provided to us. /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be /// rewound to the provided indexes. fn txhashset_write( &self, h: Hash, rewind_to_output: u64, rewind_to_kernel: u64, txhashset_data: File, _peer_addr: SocketAddr, ) -> bool { // TODO check whether we should accept any txhashset now if let Err(e) = w(&self.chain).txhashset_write(h, rewind_to_output, rewind_to_kernel, txhashset_data) { error!(LOGGER, "Failed to save txhashset archive: {:?}", e); !e.is_bad_data() } else { info!(LOGGER, "Received valid txhashset data for {}.", h); self.currently_syncing.store(true, Ordering::Relaxed); true } } } impl NetToChainAdapter { /// Construct a new NetToChainAdapter instance pub fn new( currently_syncing: Arc, archive_mode: bool, chain_ref: Weak, tx_pool: Arc>>, config: ServerConfig, ) -> NetToChainAdapter { NetToChainAdapter { currently_syncing, archive_mode, chain: chain_ref, tx_pool, peers: OneTime::new(), config, } } /// Initialize a NetToChainAdaptor with reference to a Peers object. /// Should only be called once. pub fn init(&self, peers: Weak) { self.peers.init(peers); } // recursively go back through the locator vector and stop when we find // a header that we recognize this will be a header shared in common // between us and the peer fn find_common_header(&self, locator: Vec) -> Option { if locator.len() == 0 { return None; } let chain = w(&self.chain); let known = chain.get_block_header(&locator[0]); match known { Ok(header) => { // even if we know the block, it may not be on our winning chain let known_winning = chain.get_header_by_height(header.height); if let Ok(known_winning) = known_winning { if known_winning.hash() != header.hash() { self.find_common_header(locator[1..].to_vec()) } else { Some(header) } } else { self.find_common_header(locator[1..].to_vec()) } } Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => { self.find_common_header(locator[1..].to_vec()) } Err(e) => { error!(LOGGER, "Could not build header locator: {:?}", e); None } } } // pushing the new block through the chain pipeline // remembering to reset the head if we have a bad block fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool { let prev_hash = b.header.previous; let bhash = b.hash(); let chain = w(&self.chain); match chain.process_block(b, self.chain_opts()) { Ok((tip, _)) => { self.validate_chain(bhash); self.check_compact(tip); true } Err(chain::Error::Orphan) => { // make sure we did not miss the parent block if !chain.is_orphan(&prev_hash) && !self.currently_syncing.load(Ordering::Relaxed) { debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash); self.request_block_by_hash(prev_hash, &addr) } true } Err(ref e) if e.is_bad_data() => { debug!( LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash ); let _ = chain.reset_head(); // we potentially changed the state of the system here // so check everything is still ok self.validate_chain(bhash); false } Err(e) => { debug!( LOGGER, "adapter: process_block: block {} refused by chain: {:?}", bhash, e ); true } } } fn validate_chain(&self, bhash: Hash) { // If we are running in "validate the full chain every block" then // panic here if validation fails for any reason. // We are out of consensus at this point and want to track the problem // down as soon as possible. // Skip this if we are currently syncing (too slow). let chain = w(&self.chain); if chain.head().unwrap().height > 0 && !self.currently_syncing.load(Ordering::Relaxed) && self.config.chain_validation_mode == ChainValidationMode::EveryBlock { let now = Instant::now(); debug!( LOGGER, "adapter: process_block: ***** validating full chain state at {}", bhash, ); let chain = w(&self.chain); chain .validate(true) .expect("chain validation failed, hard stop"); debug!( LOGGER, "adapter: process_block: ***** done validating full chain state, took {}s", now.elapsed().as_secs(), ); } } fn check_compact(&self, tip_res: Option) { // no compaction during sync or if we're in historical mode if self.archive_mode || self.currently_syncing.load(Ordering::Relaxed) { return; } if let Some(tip) = tip_res { // trigger compaction every 2000 blocks, uses a different thread to avoid // blocking the caller thread (likely a peer) if tip.height % 2000 == 0 { let chain = w(&self.chain); let _ = thread::Builder::new() .name("compactor".to_string()) .spawn(move || { if let Err(e) = chain.compact() { error!(LOGGER, "Could not compact chain: {:?}", e); } }); } } } // After receiving a compact block if we cannot successfully hydrate // it into a full block then fallback to requesting the full block // from the same peer that gave us the compact block // consider additional peers for redundancy? fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) { self.request_block_by_hash(bh.hash(), addr) } fn request_block_by_hash(&self, h: Hash, addr: &SocketAddr) { self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h)) } // After we have received a block header in "header first" propagation // we need to go request the block (compact representation) from the // same peer that gave us the header (unless we have already accepted the block) fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) { self.send_block_request_to_peer(bh.hash(), addr, |peer, h| { peer.send_compact_block_request(h) }) } fn send_block_request_to_peer(&self, h: Hash, addr: &SocketAddr, f: F) where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>, { match w(&self.chain).block_exists(h) { Ok(false) => { match wo(&self.peers).get_connected_peer(addr) { None => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, not connected", addr), Some(peer) => { match peer.read() { Err(e) => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, read fails: {:?}", addr, e), Ok(p) => { if let Err(e) = f(&p, h) { error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e) } } } } } } Ok(true) => debug!(LOGGER, "send_block_request_to_peer: block {} already known", h), Err(e) => error!(LOGGER, "send_block_request_to_peer: failed to check block exists: {:?}", e) } } /// Prepare options for the chain pipeline fn chain_opts(&self) -> chain::Options { let opts = if self.currently_syncing.load(Ordering::Relaxed) { chain::Options::SYNC } else { chain::Options::NONE }; opts } } /// Implementation of the ChainAdapter for the network. Gets notified when the /// blockchain accepted a new block, asking the pool to update its state and /// the network to broadcast the block pub struct ChainToPoolAndNetAdapter { tx_pool: Arc>>, peers: OneTime>, } impl ChainAdapter for ChainToPoolAndNetAdapter { fn block_accepted(&self, b: &core::Block, opts: Options) { debug!(LOGGER, "adapter: block_accepted: {:?}", b.hash()); if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) { error!( LOGGER, "Pool could not update itself at block {}: {:?}", b.hash(), e, ); } // If we mined the block then we want to broadcast the block itself. // If block is empty then broadcast the block. // If block contains txs then broadcast the compact block. // If we received the block from another node then broadcast "header first" // to minimize network traffic. if opts.contains(Options::MINE) { // propagate compact block out if we mined the block // but broadcast full block if we have no txs let cb = b.as_compact_block(); if cb.kern_ids.is_empty() { // in the interest of testing all code paths // randomly decide how we send an empty block out // TODO - lock this down once we are comfortable it works... let mut rng = rand::thread_rng(); if rng.gen() { wo(&self.peers).broadcast_block(&b); } else { wo(&self.peers).broadcast_compact_block(&cb); } } else { wo(&self.peers).broadcast_compact_block(&cb); } } else { // "header first" propagation if we are not the originator of this block // again randomly chose between "header first" or "compact block" propagation // to ensure we test a wide variety of code paths let mut rng = rand::thread_rng(); if rng.gen() { wo(&self.peers).broadcast_header(&b.header); } else { let cb = b.as_compact_block(); wo(&self.peers).broadcast_compact_block(&cb); } } } } impl ChainToPoolAndNetAdapter { /// Construct a ChainToPoolAndNetAdaper instance. pub fn new( tx_pool: Arc>>, ) -> ChainToPoolAndNetAdapter { ChainToPoolAndNetAdapter { tx_pool: tx_pool, peers: OneTime::new(), } } /// Initialize a ChainToPoolAndNetAdapter instance with hanlde to a Peers /// object. Should only be called once. pub fn init(&self, peers: Weak) { self.peers.init(peers); } } /// Adapter between the transaction pool and the network, to relay /// transactions that have been accepted. pub struct PoolToNetAdapter { peers: OneTime>, } impl pool::PoolAdapter for PoolToNetAdapter { fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { wo(&self.peers) .broadcast_stem_transaction(tx) .map_err(|_| pool::PoolError::DandelionError)?; Ok(()) } fn tx_accepted(&self, tx: &core::Transaction) { wo(&self.peers).broadcast_transaction(tx); } } impl PoolToNetAdapter { /// Create a new pool to net adapter pub fn new() -> PoolToNetAdapter { PoolToNetAdapter { peers: OneTime::new(), } } /// Setup the p2p server on the adapter pub fn init(&self, peers: Weak) { self.peers.init(peers); } } /// Implements the view of the blockchain required by the TransactionPool to /// operate. Mostly needed to break any direct lifecycle or implementation /// dependency between the pool and the chain. #[derive(Clone)] pub struct PoolToChainAdapter { chain: OneTime>, } impl PoolToChainAdapter { /// Create a new pool adapter pub fn new() -> PoolToChainAdapter { PoolToChainAdapter { chain: OneTime::new(), } } /// Set the pool adapter's chain. Should only be called once. pub fn set_chain(&self, chain_ref: Weak) { self.chain.init(chain_ref); } } impl pool::BlockChain for PoolToChainAdapter { fn validate_raw_txs( &self, txs: Vec, pre_tx: Option, ) -> Result<(Vec), pool::PoolError> { wo(&self.chain).validate_raw_txs(txs, pre_tx).map_err(|_| { pool::PoolError::Other("Chain adapter failed to validate_raw_txs.".to_string()) }) } fn verify_coinbase_maturity(&self, tx: &Transaction) -> Result<(), pool::PoolError> { wo(&self.chain) .verify_coinbase_maturity(tx) .map_err(|_| pool::PoolError::ImmatureCoinbase) } fn verify_tx_lock_height(&self, tx: &Transaction) -> Result<(), pool::PoolError> { wo(&self.chain) .verify_tx_lock_height(tx) .map_err(|_| pool::PoolError::ImmatureTransaction) } }