From 94732b0d583d4b8b043895a120ac74f8b9952942 Mon Sep 17 00:00:00 2001 From: hashmap Date: Mon, 8 Apr 2019 22:13:28 +0200 Subject: [PATCH] Return Result from methods of ChainAdapter (#2722) Most of the methods return nothing or bool which is used to decide if a sender of a message should be banned or not. However underlying chain implementation may fail so we need a way to reflect this fact in API. Also it allows to reduce number of unwraps and makes the code more robust. --- Cargo.lock | 1 + p2p/Cargo.toml | 1 + p2p/src/lib.rs | 1 + p2p/src/peer.rs | 43 ++++++--- p2p/src/peers.rs | 116 ++++++++++++++++-------- p2p/src/protocol.rs | 22 ++--- p2p/src/serv.rs | 77 ++++++++++------ p2p/src/store.rs | 3 +- p2p/src/types.rs | 46 +++++++--- servers/src/common/adapters.rs | 137 ++++++++++++++++++----------- servers/src/grin/seed.rs | 8 +- servers/src/grin/sync/body_sync.rs | 2 +- servers/src/grin/sync/syncer.rs | 2 +- 13 files changed, 303 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcc5b157f..3d8bb0ef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -853,6 +853,7 @@ dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "grin_chain 1.0.3", "grin_core 1.0.3", "grin_pool 1.0.3", "grin_store 1.0.3", diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 757914143..a0f3793e7 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -24,6 +24,7 @@ chrono = { version = "0.4.4", features = ["serde"] } grin_core = { path = "../core", version = "1.0.3" } grin_store = { path = "../store", version = "1.0.3" } +grin_chain = { path = "../chain", version = "1.0.3" } grin_util = { path = "../util", version = "1.0.3" } [dev-dependencies] diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 3ba530201..18d33b043 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -29,6 +29,7 @@ use lmdb_zero as lmdb; #[macro_use] extern crate grin_core as core; +use grin_chain as chain; use grin_util as util; #[macro_use] diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 72c66ec56..ce6daec54 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -17,6 +17,7 @@ use std::fs::File; use std::net::{Shutdown, TcpStream}; use std::sync::Arc; +use crate::chain; use crate::conn; use crate::core::core::hash::{Hash, Hashed}; use crate::core::pow::Difficulty; @@ -513,11 +514,11 @@ impl TrackingAdapter { } impl ChainAdapter for TrackingAdapter { - fn total_difficulty(&self) -> Difficulty { + fn total_difficulty(&self) -> Result { self.adapter.total_difficulty() } - fn total_height(&self) -> u64 { + fn total_height(&self) -> Result { self.adapter.total_height() } @@ -525,12 +526,16 @@ impl ChainAdapter for TrackingAdapter { self.adapter.get_transaction(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) { + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result { self.push_recv(kernel_hash); self.adapter.tx_kernel_received(kernel_hash, addr) } - fn transaction_received(&self, tx: core::Transaction, stem: bool) { + fn transaction_received( + &self, + tx: core::Transaction, + stem: bool, + ) -> Result { // Do not track the tx hash for stem txs. // Otherwise we fail to handle the subsequent fluff or embargo expiration // correctly. @@ -541,27 +546,40 @@ impl ChainAdapter for TrackingAdapter { self.adapter.transaction_received(tx, stem) } - fn block_received(&self, b: core::Block, addr: PeerAddr, _was_requested: bool) -> bool { + fn block_received( + &self, + b: core::Block, + addr: PeerAddr, + _was_requested: bool, + ) -> Result { let bh = b.hash(); self.push_recv(bh); self.adapter.block_received(b, addr, self.has_req(bh)) } - fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool { + fn compact_block_received( + &self, + cb: core::CompactBlock, + addr: PeerAddr, + ) -> Result { self.push_recv(cb.hash()); self.adapter.compact_block_received(cb, addr) } - fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool { + fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result { self.push_recv(bh.hash()); self.adapter.header_received(bh, addr) } - fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool { + fn headers_received( + &self, + bh: &[core::BlockHeader], + addr: PeerAddr, + ) -> Result { self.adapter.headers_received(bh, addr) } - fn locate_headers(&self, locator: &[Hash]) -> Vec { + fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { self.adapter.locate_headers(locator) } @@ -577,7 +595,12 @@ impl ChainAdapter for TrackingAdapter { self.adapter.txhashset_receive_ready() } - fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool { + fn txhashset_write( + &self, + h: Hash, + txhashset_data: File, + peer_addr: PeerAddr, + ) -> Result { self.adapter.txhashset_write(h, txhashset_data, peer_addr) } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 06865e37b..16ac4b5ed 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use rand::{thread_rng, Rng}; +use crate::chain; use crate::core::core; use crate::core::core::hash::{Hash, Hashed}; use crate::core::global; @@ -171,13 +172,13 @@ impl Peers { // Return vec of connected peers that currently advertise more work // (total_difficulty) than we do. - pub fn more_work_peers(&self) -> Vec> { + pub fn more_work_peers(&self) -> Result>, chain::Error> { let peers = self.connected_peers(); if peers.len() == 0 { - return vec![]; + return Ok(vec![]); } - let total_difficulty = self.total_difficulty(); + let total_difficulty = self.total_difficulty()?; let mut max_peers = peers .into_iter() @@ -185,28 +186,34 @@ impl Peers { .collect::>(); thread_rng().shuffle(&mut max_peers); - max_peers + Ok(max_peers) } // Return number of connected peers that currently advertise more/same work // (total_difficulty) than/as we do. - pub fn more_or_same_work_peers(&self) -> usize { + pub fn more_or_same_work_peers(&self) -> Result { let peers = self.connected_peers(); if peers.len() == 0 { - return 0; + return Ok(0); } - let total_difficulty = self.total_difficulty(); + let total_difficulty = self.total_difficulty()?; - peers + Ok(peers .iter() .filter(|x| x.info.total_difficulty() >= total_difficulty) - .count() + .count()) } /// Returns single random peer with more work than us. pub fn more_work_peer(&self) -> Option> { - self.more_work_peers().pop() + match self.more_work_peers() { + Ok(mut peers) => peers.pop(), + Err(e) => { + error!("failed to get more work peers: {:?}", e); + None + } + } } /// Return vec of connected peers that currently have the most worked @@ -452,10 +459,15 @@ impl Peers { rm.push(peer.info.addr.clone()); } else { let (stuck, diff) = peer.is_stuck(); - if stuck && diff < self.adapter.total_difficulty() { - debug!("clean_peers {:?}, stuck peer", peer.info.addr); - let _ = self.update_state(peer.info.addr, State::Defunct); - rm.push(peer.info.addr.clone()); + match self.adapter.total_difficulty() { + Ok(total_difficulty) => { + if stuck && diff < total_difficulty { + debug!("clean_peers {:?}, stuck peer", peer.info.addr); + let _ = self.update_state(peer.info.addr, State::Defunct); + rm.push(peer.info.addr.clone()); + } + } + Err(e) => error!("failed to get total difficulty: {:?}", e), } } } @@ -529,11 +541,11 @@ impl Peers { } impl ChainAdapter for Peers { - fn total_difficulty(&self) -> Difficulty { + fn total_difficulty(&self) -> Result { self.adapter.total_difficulty() } - fn total_height(&self) -> u64 { + fn total_height(&self) -> Result { self.adapter.total_height() } @@ -541,17 +553,26 @@ impl ChainAdapter for Peers { self.adapter.get_transaction(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) { + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result { self.adapter.tx_kernel_received(kernel_hash, addr) } - fn transaction_received(&self, tx: core::Transaction, stem: bool) { + fn transaction_received( + &self, + tx: core::Transaction, + stem: bool, + ) -> Result { self.adapter.transaction_received(tx, stem) } - fn block_received(&self, b: core::Block, peer_addr: PeerAddr, was_requested: bool) -> bool { + fn block_received( + &self, + b: core::Block, + peer_addr: PeerAddr, + was_requested: bool, + ) -> Result { let hash = b.hash(); - if !self.adapter.block_received(b, peer_addr, was_requested) { + if !self.adapter.block_received(b, peer_addr, was_requested)? { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( @@ -559,15 +580,19 @@ impl ChainAdapter for Peers { hash, peer_addr ); self.ban_peer(peer_addr, ReasonForBan::BadBlock); - false + Ok(false) } else { - true + Ok(true) } } - fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: PeerAddr) -> bool { + fn compact_block_received( + &self, + cb: core::CompactBlock, + peer_addr: PeerAddr, + ) -> Result { let hash = cb.hash(); - if !self.adapter.compact_block_received(cb, peer_addr) { + if !self.adapter.compact_block_received(cb, peer_addr)? { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban debug!( @@ -575,35 +600,43 @@ impl ChainAdapter for Peers { hash, peer_addr ); self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock); - false + Ok(false) } else { - true + Ok(true) } } - fn header_received(&self, bh: core::BlockHeader, peer_addr: PeerAddr) -> bool { - if !self.adapter.header_received(bh, peer_addr) { + fn header_received( + &self, + bh: core::BlockHeader, + peer_addr: PeerAddr, + ) -> Result { + if !self.adapter.header_received(bh, peer_addr)? { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); - false + Ok(false) } else { - true + Ok(true) } } - fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: PeerAddr) -> bool { - if !self.adapter.headers_received(headers, peer_addr) { + fn headers_received( + &self, + headers: &[core::BlockHeader], + peer_addr: PeerAddr, + ) -> Result { + if !self.adapter.headers_received(headers, peer_addr)? { // if the peer sent us a block header that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader); - false + Ok(false) } else { - true + Ok(true) } } - fn locate_headers(&self, hs: &[Hash]) -> Vec { + fn locate_headers(&self, hs: &[Hash]) -> Result, chain::Error> { self.adapter.locate_headers(hs) } @@ -619,16 +652,21 @@ impl ChainAdapter for Peers { self.adapter.txhashset_receive_ready() } - fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool { - if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) { + fn txhashset_write( + &self, + h: Hash, + txhashset_data: File, + peer_addr: PeerAddr, + ) -> Result { + if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? { debug!( "Received a bad txhashset data from {}, the peer will be banned", &peer_addr ); self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet); - false + Ok(false) } else { - true + Ok(true) } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index cc49abd3f..b98df4240 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -68,8 +68,8 @@ impl MessageHandler for Protocol { Ok(Some(Response::new( Type::Pong, Pong { - total_difficulty: adapter.total_difficulty(), - height: adapter.total_height(), + total_difficulty: adapter.total_difficulty()?, + height: adapter.total_height()?, }, writer, )?)) @@ -93,7 +93,7 @@ impl MessageHandler for Protocol { "handle_payload: received tx kernel: {}, msg_len: {}", h, msg.header.msg_len ); - adapter.tx_kernel_received(h, self.addr); + adapter.tx_kernel_received(h, self.addr)?; Ok(None) } @@ -117,7 +117,7 @@ impl MessageHandler for Protocol { msg.header.msg_len ); let tx: core::Transaction = msg.body()?; - adapter.transaction_received(tx, false); + adapter.transaction_received(tx, false)?; Ok(None) } @@ -127,7 +127,7 @@ impl MessageHandler for Protocol { msg.header.msg_len ); let tx: core::Transaction = msg.body()?; - adapter.transaction_received(tx, true); + adapter.transaction_received(tx, true)?; Ok(None) } @@ -155,7 +155,7 @@ impl MessageHandler for Protocol { // we can't know at this level whether we requested the block or not, // the boolean should be properly set in higher level adapter - adapter.block_received(b, self.addr, false); + adapter.block_received(b, self.addr, false)?; Ok(None) } @@ -176,14 +176,14 @@ impl MessageHandler for Protocol { ); let b: core::CompactBlock = msg.body()?; - adapter.compact_block_received(b, self.addr); + adapter.compact_block_received(b, self.addr)?; Ok(None) } Type::GetHeaders => { // load headers from the locator let loc: Locator = msg.body()?; - let headers = adapter.locate_headers(&loc.hashes); + let headers = adapter.locate_headers(&loc.hashes)?; // serialize and send all the headers over Ok(Some(Response::new( @@ -197,7 +197,7 @@ impl MessageHandler for Protocol { // we can go request it from some of our peers Type::Header => { let header: core::BlockHeader = msg.body()?; - adapter.header_received(header, self.addr); + adapter.header_received(header, self.addr)?; Ok(None) } @@ -217,7 +217,7 @@ impl MessageHandler for Protocol { headers.push(header); total_bytes_read += bytes_read; } - adapter.headers_received(&headers, self.addr); + adapter.headers_received(&headers, self.addr)?; } // Now check we read the correct total number of bytes off the stream. @@ -335,7 +335,7 @@ impl MessageHandler for Protocol { let tmp_zip = File::open(tmp)?; let res = self .adapter - .txhashset_write(sm_arch.hash, tmp_zip, self.addr); + .txhashset_write(sm_arch.hash, tmp_zip, self.addr)?; debug!( "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 8be990846..e7038493a 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -12,19 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fs::File; -use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::Arc; -use std::time::Duration; -use std::{io, thread}; - -use crate::lmdb; - +use crate::chain; use crate::core::core; use crate::core::core::hash::Hash; use crate::core::global; use crate::core::pow::Difficulty; use crate::handshake::Handshake; +use crate::lmdb; use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; @@ -33,6 +27,11 @@ use crate::types::{ }; use crate::util::{Mutex, StopState}; use chrono::prelude::{DateTime, Utc}; +use std::fs::File; +use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; +use std::sync::Arc; +use std::time::Duration; +use std::{io, thread}; /// P2P server implementation, handling bootstrapping to find and connect to /// peers, receiving connections from other peers and keep track of all of them. @@ -139,7 +138,7 @@ impl Server { match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) { Ok(mut stream) => { let addr = SocketAddr::new(self.config.host, self.config.port); - let total_diff = self.peers.total_difficulty(); + let total_diff = self.peers.total_difficulty()?; let mut peer = Peer::connect( &mut stream, @@ -168,7 +167,7 @@ impl Server { } fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> { - let total_diff = self.peers.total_difficulty(); + let total_diff = self.peers.total_difficulty()?; // accept the peer and add it to the server map let mut peer = Peer::accept( @@ -231,31 +230,48 @@ impl Server { pub struct DummyAdapter {} impl ChainAdapter for DummyAdapter { - fn total_difficulty(&self) -> Difficulty { - Difficulty::min() + fn total_difficulty(&self) -> Result { + Ok(Difficulty::min()) } - fn total_height(&self) -> u64 { - 0 + fn total_height(&self) -> Result { + Ok(0) } fn get_transaction(&self, _h: Hash) -> Option { None } - fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) {} - fn transaction_received(&self, _: core::Transaction, _stem: bool) {} - fn compact_block_received(&self, _cb: core::CompactBlock, _addr: PeerAddr) -> bool { - true + + fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) -> Result { + Ok(true) } - fn header_received(&self, _bh: core::BlockHeader, _addr: PeerAddr) -> bool { - true + fn transaction_received( + &self, + _: core::Transaction, + _stem: bool, + ) -> Result { + Ok(true) } - fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> bool { - true + fn compact_block_received( + &self, + _cb: core::CompactBlock, + _addr: PeerAddr, + ) -> Result { + Ok(true) } - fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> bool { - true + fn header_received( + &self, + _bh: core::BlockHeader, + _addr: PeerAddr, + ) -> Result { + Ok(true) } - fn locate_headers(&self, _: &[Hash]) -> Vec { - vec![] + fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> Result { + Ok(true) + } + fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> Result { + Ok(true) + } + fn locate_headers(&self, _: &[Hash]) -> Result, chain::Error> { + Ok(vec![]) } fn get_block(&self, _: Hash) -> Option { None @@ -268,8 +284,13 @@ impl ChainAdapter for DummyAdapter { false } - fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: PeerAddr) -> bool { - false + fn txhashset_write( + &self, + _h: Hash, + _txhashset_data: File, + _peer_addr: PeerAddr, + ) -> Result { + Ok(false) } fn txhashset_download_update( diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 7bb88ebf7..0727abb09 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -168,7 +168,8 @@ impl PeerStore { /// Used for /v1/peers/all api endpoint pub fn all_peers(&self) -> Result, Error> { let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes()); - Ok(self.db + Ok(self + .db .iter::(&key)? .map(|(_, v)| v) .collect::>()) diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 9481c357a..5bf2b8e4e 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use chrono::prelude::*; +use crate::chain; use crate::core::core; use crate::core::core::hash::Hash; use crate::core::global; @@ -63,6 +64,7 @@ pub enum Error { ConnectionClose, Timeout, Store(grin_store::Error), + Chain(chain::Error), PeerWithSelf, NoDandelionRelay, ProtocolMismatch { @@ -88,6 +90,11 @@ impl From for Error { Error::Store(e) } } +impl From for Error { + fn from(e: chain::Error) -> Error { + Error::Chain(e) + } +} impl From for Error { fn from(e: io::Error) -> Error { Error::Connection(e) @@ -465,37 +472,51 @@ pub struct TxHashSetRead { /// other things. pub trait ChainAdapter: Sync + Send { /// Current total difficulty on our chain - fn total_difficulty(&self) -> Difficulty; + fn total_difficulty(&self) -> Result; /// Current total height - fn total_height(&self) -> u64; + fn total_height(&self) -> Result; /// A valid transaction has been received from one of our peers - fn transaction_received(&self, tx: core::Transaction, stem: bool); + fn transaction_received(&self, tx: core::Transaction, stem: bool) + -> Result; fn get_transaction(&self, kernel_hash: Hash) -> Option; - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr); + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result; /// A block has been received from one of our peers. Returns true if the /// block could be handled properly and is not deemed defective by the /// chain. Returning false means the block will never be valid and /// may result in the peer being banned. - fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool; + fn block_received( + &self, + b: core::Block, + addr: PeerAddr, + was_requested: bool, + ) -> Result; - fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool; + fn compact_block_received( + &self, + cb: core::CompactBlock, + addr: PeerAddr, + ) -> Result; - fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool; + fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result; /// A set of block header has been received, typically in response to a /// block /// header request. - fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool; + fn headers_received( + &self, + bh: &[core::BlockHeader], + addr: PeerAddr, + ) -> Result; /// Finds a list of block headers based on the provided locator. Tries to /// identify the common chain and gets the headers that follow it /// immediately. - fn locate_headers(&self, locator: &[Hash]) -> Vec; + fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error>; /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; @@ -523,7 +544,12 @@ pub trait ChainAdapter: Sync + Send { /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be /// rewound to the provided indexes. - fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool; + fn txhashset_write( + &self, + h: Hash, + txhashset_data: File, + peer_addr: PeerAddr, + ) -> Result; } /// Additional methods required by the protocol that don't need to be diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 47eb49a1f..9aeda5334 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -50,22 +50,22 @@ pub struct NetToChainAdapter { } impl p2p::ChainAdapter for NetToChainAdapter { - fn total_difficulty(&self) -> Difficulty { - self.chain().head().unwrap().total_difficulty + fn total_difficulty(&self) -> Result { + Ok(self.chain().head()?.total_difficulty) } - fn total_height(&self) -> u64 { - self.chain().head().unwrap().height + fn total_height(&self) -> Result { + Ok(self.chain().head()?.height) } fn get_transaction(&self, kernel_hash: Hash) -> Option { self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash) } - fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) { + fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result { // nothing much we can do with a new transaction while syncing if self.sync_state.is_syncing() { - return; + return Ok(true); } let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash); @@ -73,12 +73,17 @@ impl p2p::ChainAdapter for NetToChainAdapter { if tx.is_none() { self.request_transaction(kernel_hash, addr); } + Ok(true) } - fn transaction_received(&self, tx: core::Transaction, stem: bool) { + fn transaction_received( + &self, + tx: core::Transaction, + stem: bool, + ) -> Result { // nothing much we can do with a new transaction while syncing if self.sync_state.is_syncing() { - return; + return Ok(true); } let source = pool::TxSource { @@ -87,7 +92,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { }; let tx_hash = tx.hash(); - let header = self.chain().head_header().unwrap(); + let header = self.chain().head_header()?; debug!( "Received tx {}, [in/out/kern: {}/{}/{}] going to process.", @@ -97,17 +102,22 @@ impl p2p::ChainAdapter for NetToChainAdapter { tx.kernels().len(), ); - let res = { - let mut tx_pool = self.tx_pool.write(); - tx_pool.add_to_pool(source, tx, stem, &header) - }; - - if let Err(e) = res { - debug!("Transaction {} rejected: {:?}", tx_hash, e); + let mut tx_pool = self.tx_pool.write(); + match tx_pool.add_to_pool(source, tx, stem, &header) { + Ok(_) => Ok(true), + Err(e) => { + debug!("Transaction {} rejected: {:?}", tx_hash, e); + Ok(false) + } } } - fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool { + fn block_received( + &self, + b: core::Block, + addr: PeerAddr, + was_requested: bool, + ) -> Result { debug!( "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", b.hash(), @@ -120,7 +130,11 @@ impl p2p::ChainAdapter for NetToChainAdapter { self.process_block(b, addr, was_requested) } - fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool { + fn compact_block_received( + &self, + cb: core::CompactBlock, + addr: PeerAddr, + ) -> Result { let bhash = cb.hash(); debug!( "Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.", @@ -139,7 +153,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { Ok(block) => self.process_block(block, addr, false), Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb_hash, e); - return false; + return Ok(false); } } } else { @@ -149,7 +163,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { .process_block_header(&cb.header, self.chain_opts(false)) { debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind()); - return !e.is_bad_data(); + return Ok(!e.is_bad_data()); } let (txs, missing_short_ids) = { @@ -173,7 +187,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { Ok(block) => block, Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb.hash(), e); - return false; + return Ok(false); } }; @@ -188,20 +202,20 @@ impl p2p::ChainAdapter for NetToChainAdapter { if self.sync_state.status() == SyncStatus::NoSync { debug!("adapter: block invalid after hydration, requesting full block"); self.request_block(&cb.header, addr); - true + Ok(true) } else { debug!("block invalid after hydration, ignoring it, cause still syncing"); - true + Ok(true) } } } else { debug!("failed to retrieve previous block header (still syncing?)"); - true + Ok(true) } } } - fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool { + fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result { let bhash = bh.hash(); debug!( "Received block header {} at {} from {}, going to process.", @@ -214,14 +228,14 @@ impl p2p::ChainAdapter for NetToChainAdapter { .chain() .process_block_header(&bh, self.chain_opts(false)); - if let &Err(ref e) = &res { + if let Err(e) = res { debug!("Block header {} refused by chain: {:?}", bhash, e.kind()); if e.is_bad_data() { - return false; + return Ok(false); } else { // we got an error when trying to process the block header // but nothing serious enough to need to ban the peer upstream - return true; + return Err(e); } } @@ -230,37 +244,43 @@ impl p2p::ChainAdapter for NetToChainAdapter { self.request_compact_block(&bh, addr); // done receiving the header - true + Ok(true) } - fn headers_received(&self, bhs: &[core::BlockHeader], addr: PeerAddr) -> bool { + fn headers_received( + &self, + bhs: &[core::BlockHeader], + addr: PeerAddr, + ) -> Result { info!("Received {} block headers from {}", bhs.len(), addr,); if bhs.len() == 0 { - return false; + return Ok(false); } // try to add headers to our header chain - let res = self.chain().sync_block_headers(bhs, self.chain_opts(true)); - if let &Err(ref e) = &res { - debug!("Block headers refused by chain: {:?}", e); - - if e.is_bad_data() { - return false; + match self.chain().sync_block_headers(bhs, self.chain_opts(true)) { + Ok(_) => Ok(true), + Err(e) => { + debug!("Block headers refused by chain: {:?}", e); + if e.is_bad_data() { + return Ok(false); + } else { + Err(e) + } } } - true } - fn locate_headers(&self, locator: &[Hash]) -> Vec { + fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { debug!("locator: {:?}", locator); let header = match self.find_common_header(locator) { Some(header) => header, - None => return vec![], + None => return Ok(vec![]), }; - let max_height = self.chain().header_head().unwrap().height; + let max_height = self.chain().header_head()?.height; let txhashset = self.chain().txhashset(); let txhashset = txhashset.read(); @@ -283,7 +303,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { debug!("returning headers: {}", headers.len()); - headers + Ok(headers) } /// Gets a full block by its hash. @@ -348,11 +368,16 @@ impl p2p::ChainAdapter for NetToChainAdapter { /// If we're willing to accept that new state, the data stream will be /// read as a zip file, unzipped and the resulting state files should be /// rewound to the provided indexes. - fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: PeerAddr) -> bool { + fn txhashset_write( + &self, + h: Hash, + txhashset_data: File, + _peer_addr: PeerAddr, + ) -> Result { // check status again after download, in case 2 txhashsets made it somehow if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { } else { - return true; + return Ok(true); } if let Err(e) = self @@ -361,12 +386,13 @@ impl p2p::ChainAdapter for NetToChainAdapter { { self.chain().clean_txhashset_sandbox(); error!("Failed to save txhashset archive: {}", e); + let is_good_data = !e.is_bad_data(); self.sync_state.set_sync_error(types::Error::Chain(e)); - is_good_data + Ok(is_good_data) } else { info!("Received valid txhashset data for {}.", h); - true + Ok(true) } } } @@ -428,15 +454,20 @@ impl NetToChainAdapter { // pushing the new block through the chain pipeline // remembering to reset the head if we have a bad block - fn process_block(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool { + fn process_block( + &self, + b: core::Block, + addr: PeerAddr, + was_requested: bool, + ) -> Result { // We cannot process blocks earlier than the horizon so check for this here. { - let head = self.chain().head().unwrap(); + let head = self.chain().head()?; let horizon = head .height .saturating_sub(global::cut_through_horizon() as u64); if b.header.height < horizon { - return true; + return Ok(true); } } @@ -450,11 +481,11 @@ impl NetToChainAdapter { Ok(_) => { self.validate_chain(bhash); self.check_compact(); - true + Ok(true) } Err(ref e) if e.is_bad_data() => { self.validate_chain(bhash); - false + Ok(false) } Err(e) => { match e.kind() { @@ -468,7 +499,7 @@ impl NetToChainAdapter { self.request_block_by_hash(previous.hash(), addr) } } - true + Ok(true) } _ => { debug!( @@ -476,7 +507,7 @@ impl NetToChainAdapter { bhash, e.kind() ); - true + Ok(true) } } } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 5242b7be8..f07794f72 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -129,8 +129,12 @@ pub fn connect_and_monitor( if Utc::now() - prev_ping > Duration::seconds(10) { let total_diff = peers.total_difficulty(); let total_height = peers.total_height(); - peers.check_all(total_diff, total_height); - prev_ping = Utc::now(); + if total_diff.is_ok() && total_height.is_ok() { + peers.check_all(total_diff.unwrap(), total_height.unwrap()); + prev_ping = Utc::now(); + } else { + error!("failed to get peers difficulty and/or height"); + } } thread::sleep(time::Duration::from_secs(1)); diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 37403c6c0..15c2b9332 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -100,7 +100,7 @@ impl BodySync { hashes.reverse(); - let peers = self.peers.more_work_peers(); + let peers = self.peers.more_work_peers()?; // if we have 5 peers to sync from then ask for 50 blocks total (peer_count * // 10) max will be 80 if all 8 peers are advertising more work diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index f9230e7da..439d8fbb3 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -77,7 +77,7 @@ impl SyncRunner { let mut n = 0; const MIN_PEERS: usize = 3; loop { - let wp = self.peers.more_or_same_work_peers(); + let wp = self.peers.more_or_same_work_peers()?; // exit loop when: // * we have more than MIN_PEERS more_or_same_work peers // * we are synced already, e.g. grin was quickly restarted