Return Result from methods of ChainAdapter (#2722)

Most of the methods return nothing or bool which is used to decide if a
sender of a message should be banned or not. However underlying chain
implementation may fail so we need a way to reflect this fact in API.

Also it allows to reduce number of unwraps and makes the code more robust.
This commit is contained in:
hashmap 2019-04-08 22:13:28 +02:00 committed by GitHub
parent cdc17c6cd9
commit 94732b0d58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 303 additions and 156 deletions

1
Cargo.lock generated
View file

@ -853,6 +853,7 @@ dependencies = [
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"grin_chain 1.0.3",
"grin_core 1.0.3",
"grin_pool 1.0.3",
"grin_store 1.0.3",

View file

@ -24,6 +24,7 @@ chrono = { version = "0.4.4", features = ["serde"] }
grin_core = { path = "../core", version = "1.0.3" }
grin_store = { path = "../store", version = "1.0.3" }
grin_chain = { path = "../chain", version = "1.0.3" }
grin_util = { path = "../util", version = "1.0.3" }
[dev-dependencies]

View file

@ -29,6 +29,7 @@ use lmdb_zero as lmdb;
#[macro_use]
extern crate grin_core as core;
use grin_chain as chain;
use grin_util as util;
#[macro_use]

View file

@ -17,6 +17,7 @@ use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;
use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty;
@ -513,11 +514,11 @@ impl TrackingAdapter {
}
impl ChainAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Difficulty {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}
fn total_height(&self) -> u64 {
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}
@ -525,12 +526,16 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_transaction(kernel_hash)
}
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
// Do not track the tx hash for stem txs.
// Otherwise we fail to handle the subsequent fluff or embargo expiration
// correctly.
@ -541,27 +546,40 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.transaction_received(tx, stem)
}
fn block_received(&self, b: core::Block, addr: PeerAddr, _was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
_was_requested: bool,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
}
fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr)
}
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, addr)
}
fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool {
fn headers_received(
&self,
bh: &[core::BlockHeader],
addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, addr)
}
fn locate_headers(&self, locator: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(locator)
}
@ -577,7 +595,12 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.txhashset_receive_ready()
}
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
}

View file

@ -19,6 +19,7 @@ use std::sync::Arc;
use rand::{thread_rng, Rng};
use crate::chain;
use crate::core::core;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::global;
@ -171,13 +172,13 @@ impl Peers {
// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Vec<Arc<Peer>> {
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
return Ok(vec![]);
}
let total_difficulty = self.total_difficulty();
let total_difficulty = self.total_difficulty()?;
let mut max_peers = peers
.into_iter()
@ -185,28 +186,34 @@ impl Peers {
.collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
Ok(max_peers)
}
// Return number of connected peers that currently advertise more/same work
// (total_difficulty) than/as we do.
pub fn more_or_same_work_peers(&self) -> usize {
pub fn more_or_same_work_peers(&self) -> Result<usize, chain::Error> {
let peers = self.connected_peers();
if peers.len() == 0 {
return 0;
return Ok(0);
}
let total_difficulty = self.total_difficulty();
let total_difficulty = self.total_difficulty()?;
peers
Ok(peers
.iter()
.filter(|x| x.info.total_difficulty() >= total_difficulty)
.count()
.count())
}
/// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<Peer>> {
self.more_work_peers().pop()
match self.more_work_peers() {
Ok(mut peers) => peers.pop(),
Err(e) => {
error!("failed to get more work peers: {:?}", e);
None
}
}
}
/// Return vec of connected peers that currently have the most worked
@ -452,12 +459,17 @@ impl Peers {
rm.push(peer.info.addr.clone());
} else {
let (stuck, diff) = peer.is_stuck();
if stuck && diff < self.adapter.total_difficulty() {
match self.adapter.total_difficulty() {
Ok(total_difficulty) => {
if stuck && diff < total_difficulty {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
let _ = self.update_state(peer.info.addr, State::Defunct);
rm.push(peer.info.addr.clone());
}
}
Err(e) => error!("failed to get total difficulty: {:?}", e),
}
}
}
// ensure we do not still have too many connected peers
@ -529,11 +541,11 @@ impl Peers {
}
impl ChainAdapter for Peers {
fn total_difficulty(&self) -> Difficulty {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}
fn total_height(&self) -> u64 {
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}
@ -541,17 +553,26 @@ impl ChainAdapter for Peers {
self.adapter.get_transaction(kernel_hash)
}
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, addr)
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
self.adapter.transaction_received(tx, stem)
}
fn block_received(&self, b: core::Block, peer_addr: PeerAddr, was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
peer_addr: PeerAddr,
was_requested: bool,
) -> Result<bool, chain::Error> {
let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested) {
if !self.adapter.block_received(b, peer_addr, was_requested)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
@ -559,15 +580,19 @@ impl ChainAdapter for Peers {
hash, peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadBlock);
false
Ok(false)
} else {
true
Ok(true)
}
}
fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr) {
if !self.adapter.compact_block_received(cb, peer_addr)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
@ -575,35 +600,43 @@ impl ChainAdapter for Peers {
hash, peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock);
false
Ok(false)
} else {
true
Ok(true)
}
}
fn header_received(&self, bh: core::BlockHeader, peer_addr: PeerAddr) -> bool {
if !self.adapter.header_received(bh, peer_addr) {
fn header_received(
&self,
bh: core::BlockHeader,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.header_received(bh, peer_addr)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false
Ok(false)
} else {
true
Ok(true)
}
}
fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: PeerAddr) -> bool {
if !self.adapter.headers_received(headers, peer_addr) {
fn headers_received(
&self,
headers: &[core::BlockHeader],
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.headers_received(headers, peer_addr)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false
Ok(false)
} else {
true
Ok(true)
}
}
fn locate_headers(&self, hs: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, hs: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(hs)
}
@ -619,16 +652,21 @@ impl ChainAdapter for Peers {
self.adapter.txhashset_receive_ready()
}
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
&peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet);
false
Ok(false)
} else {
true
Ok(true)
}
}

View file

@ -68,8 +68,8 @@ impl MessageHandler for Protocol {
Ok(Some(Response::new(
Type::Pong,
Pong {
total_difficulty: adapter.total_difficulty(),
height: adapter.total_height(),
total_difficulty: adapter.total_difficulty()?,
height: adapter.total_height()?,
},
writer,
)?))
@ -93,7 +93,7 @@ impl MessageHandler for Protocol {
"handle_payload: received tx kernel: {}, msg_len: {}",
h, msg.header.msg_len
);
adapter.tx_kernel_received(h, self.addr);
adapter.tx_kernel_received(h, self.addr)?;
Ok(None)
}
@ -117,7 +117,7 @@ impl MessageHandler for Protocol {
msg.header.msg_len
);
let tx: core::Transaction = msg.body()?;
adapter.transaction_received(tx, false);
adapter.transaction_received(tx, false)?;
Ok(None)
}
@ -127,7 +127,7 @@ impl MessageHandler for Protocol {
msg.header.msg_len
);
let tx: core::Transaction = msg.body()?;
adapter.transaction_received(tx, true);
adapter.transaction_received(tx, true)?;
Ok(None)
}
@ -155,7 +155,7 @@ impl MessageHandler for Protocol {
// we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter
adapter.block_received(b, self.addr, false);
adapter.block_received(b, self.addr, false)?;
Ok(None)
}
@ -176,14 +176,14 @@ impl MessageHandler for Protocol {
);
let b: core::CompactBlock = msg.body()?;
adapter.compact_block_received(b, self.addr);
adapter.compact_block_received(b, self.addr)?;
Ok(None)
}
Type::GetHeaders => {
// load headers from the locator
let loc: Locator = msg.body()?;
let headers = adapter.locate_headers(&loc.hashes);
let headers = adapter.locate_headers(&loc.hashes)?;
// serialize and send all the headers over
Ok(Some(Response::new(
@ -197,7 +197,7 @@ impl MessageHandler for Protocol {
// we can go request it from some of our peers
Type::Header => {
let header: core::BlockHeader = msg.body()?;
adapter.header_received(header, self.addr);
adapter.header_received(header, self.addr)?;
Ok(None)
}
@ -217,7 +217,7 @@ impl MessageHandler for Protocol {
headers.push(header);
total_bytes_read += bytes_read;
}
adapter.headers_received(&headers, self.addr);
adapter.headers_received(&headers, self.addr)?;
}
// Now check we read the correct total number of bytes off the stream.
@ -335,7 +335,7 @@ impl MessageHandler for Protocol {
let tmp_zip = File::open(tmp)?;
let res = self
.adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr);
.txhashset_write(sm_arch.hash, tmp_zip, self.addr)?;
debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",

View file

@ -12,19 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::time::Duration;
use std::{io, thread};
use crate::lmdb;
use crate::chain;
use crate::core::core;
use crate::core::core::hash::Hash;
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::handshake::Handshake;
use crate::lmdb;
use crate::peer::Peer;
use crate::peers::Peers;
use crate::store::PeerStore;
@ -33,6 +27,11 @@ use crate::types::{
};
use crate::util::{Mutex, StopState};
use chrono::prelude::{DateTime, Utc};
use std::fs::File;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::time::Duration;
use std::{io, thread};
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
@ -139,7 +138,7 @@ impl Server {
match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(mut stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty();
let total_diff = self.peers.total_difficulty()?;
let mut peer = Peer::connect(
&mut stream,
@ -168,7 +167,7 @@ impl Server {
}
fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty();
let total_diff = self.peers.total_difficulty()?;
// accept the peer and add it to the server map
let mut peer = Peer::accept(
@ -231,31 +230,48 @@ impl Server {
pub struct DummyAdapter {}
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Difficulty {
Difficulty::min()
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
Ok(Difficulty::min())
}
fn total_height(&self) -> u64 {
0
fn total_height(&self) -> Result<u64, chain::Error> {
Ok(0)
}
fn get_transaction(&self, _h: Hash) -> Option<core::Transaction> {
None
}
fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) {}
fn transaction_received(&self, _: core::Transaction, _stem: bool) {}
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: PeerAddr) -> bool {
true
fn tx_kernel_received(&self, _h: Hash, _addr: PeerAddr) -> Result<bool, chain::Error> {
Ok(true)
}
fn header_received(&self, _bh: core::BlockHeader, _addr: PeerAddr) -> bool {
true
fn transaction_received(
&self,
_: core::Transaction,
_stem: bool,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> bool {
true
fn compact_block_received(
&self,
_cb: core::CompactBlock,
_addr: PeerAddr,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> bool {
true
fn header_received(
&self,
_bh: core::BlockHeader,
_addr: PeerAddr,
) -> Result<bool, chain::Error> {
Ok(true)
}
fn locate_headers(&self, _: &[Hash]) -> Vec<core::BlockHeader> {
vec![]
fn block_received(&self, _: core::Block, _: PeerAddr, _: bool) -> Result<bool, chain::Error> {
Ok(true)
}
fn headers_received(&self, _: &[core::BlockHeader], _: PeerAddr) -> Result<bool, chain::Error> {
Ok(true)
}
fn locate_headers(&self, _: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Ok(vec![])
}
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
@ -268,8 +284,13 @@ impl ChainAdapter for DummyAdapter {
false
}
fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: PeerAddr) -> bool {
false
fn txhashset_write(
&self,
_h: Hash,
_txhashset_data: File,
_peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
Ok(false)
}
fn txhashset_download_update(

View file

@ -168,7 +168,8 @@ impl PeerStore {
/// Used for /v1/peers/all api endpoint
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes());
Ok(self.db
Ok(self
.db
.iter::<PeerData>(&key)?
.map(|(_, v)| v)
.collect::<Vec<_>>())

View file

@ -23,6 +23,7 @@ use std::sync::Arc;
use chrono::prelude::*;
use crate::chain;
use crate::core::core;
use crate::core::core::hash::Hash;
use crate::core::global;
@ -63,6 +64,7 @@ pub enum Error {
ConnectionClose,
Timeout,
Store(grin_store::Error),
Chain(chain::Error),
PeerWithSelf,
NoDandelionRelay,
ProtocolMismatch {
@ -88,6 +90,11 @@ impl From<grin_store::Error> for Error {
Error::Store(e)
}
}
impl From<chain::Error> for Error {
fn from(e: chain::Error) -> Error {
Error::Chain(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Error {
Error::Connection(e)
@ -465,37 +472,51 @@ pub struct TxHashSetRead {
/// other things.
pub trait ChainAdapter: Sync + Send {
/// Current total difficulty on our chain
fn total_difficulty(&self) -> Difficulty;
fn total_difficulty(&self) -> Result<Difficulty, chain::Error>;
/// Current total height
fn total_height(&self) -> u64;
fn total_height(&self) -> Result<u64, chain::Error>;
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction, stem: bool);
fn transaction_received(&self, tx: core::Transaction, stem: bool)
-> Result<bool, chain::Error>;
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction>;
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr);
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error>;
/// A block has been received from one of our peers. Returns true if the
/// block could be handled properly and is not deemed defective by the
/// chain. Returning false means the block will never be valid and
/// may result in the peer being banned.
fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool;
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
was_requested: bool,
) -> Result<bool, chain::Error>;
fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool;
fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
) -> Result<bool, chain::Error>;
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool;
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error>;
/// A set of block header has been received, typically in response to a
/// block
/// header request.
fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool;
fn headers_received(
&self,
bh: &[core::BlockHeader],
addr: PeerAddr,
) -> Result<bool, chain::Error>;
/// Finds a list of block headers based on the provided locator. Tries to
/// identify the common chain and gets the headers that follow it
/// immediately.
fn locate_headers(&self, locator: &[Hash]) -> Vec<core::BlockHeader>;
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error>;
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block>;
@ -523,7 +544,12 @@ pub trait ChainAdapter: Sync + Send {
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes.
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool;
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error>;
}
/// Additional methods required by the protocol that don't need to be

View file

@ -50,22 +50,22 @@ pub struct NetToChainAdapter {
}
impl p2p::ChainAdapter for NetToChainAdapter {
fn total_difficulty(&self) -> Difficulty {
self.chain().head().unwrap().total_difficulty
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
Ok(self.chain().head()?.total_difficulty)
}
fn total_height(&self) -> u64 {
self.chain().head().unwrap().height
fn total_height(&self) -> Result<u64, chain::Error> {
Ok(self.chain().head()?.height)
}
fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash)
}
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
// nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() {
return;
return Ok(true);
}
let tx = self.tx_pool.read().retrieve_tx_by_kernel_hash(kernel_hash);
@ -73,12 +73,17 @@ impl p2p::ChainAdapter for NetToChainAdapter {
if tx.is_none() {
self.request_transaction(kernel_hash, addr);
}
Ok(true)
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
// nothing much we can do with a new transaction while syncing
if self.sync_state.is_syncing() {
return;
return Ok(true);
}
let source = pool::TxSource {
@ -87,7 +92,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
};
let tx_hash = tx.hash();
let header = self.chain().head_header().unwrap();
let header = self.chain().head_header()?;
debug!(
"Received tx {}, [in/out/kern: {}/{}/{}] going to process.",
@ -97,17 +102,22 @@ impl p2p::ChainAdapter for NetToChainAdapter {
tx.kernels().len(),
);
let res = {
let mut tx_pool = self.tx_pool.write();
tx_pool.add_to_pool(source, tx, stem, &header)
};
if let Err(e) = res {
match tx_pool.add_to_pool(source, tx, stem, &header) {
Ok(_) => Ok(true),
Err(e) => {
debug!("Transaction {} rejected: {:?}", tx_hash, e);
Ok(false)
}
}
}
fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
was_requested: bool,
) -> Result<bool, chain::Error> {
debug!(
"Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.",
b.hash(),
@ -120,7 +130,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
self.process_block(b, addr, was_requested)
}
fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
) -> Result<bool, chain::Error> {
let bhash = cb.hash();
debug!(
"Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.",
@ -139,7 +153,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
Ok(block) => self.process_block(block, addr, false),
Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
return false;
return Ok(false);
}
}
} else {
@ -149,7 +163,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.process_block_header(&cb.header, self.chain_opts(false))
{
debug!("Invalid compact block header {}: {:?}", cb_hash, e.kind());
return !e.is_bad_data();
return Ok(!e.is_bad_data());
}
let (txs, missing_short_ids) = {
@ -173,7 +187,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
Ok(block) => block,
Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb.hash(), e);
return false;
return Ok(false);
}
};
@ -188,20 +202,20 @@ impl p2p::ChainAdapter for NetToChainAdapter {
if self.sync_state.status() == SyncStatus::NoSync {
debug!("adapter: block invalid after hydration, requesting full block");
self.request_block(&cb.header, addr);
true
Ok(true)
} else {
debug!("block invalid after hydration, ignoring it, cause still syncing");
true
Ok(true)
}
}
} else {
debug!("failed to retrieve previous block header (still syncing?)");
true
Ok(true)
}
}
}
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> {
let bhash = bh.hash();
debug!(
"Received block header {} at {} from {}, going to process.",
@ -214,14 +228,14 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.chain()
.process_block_header(&bh, self.chain_opts(false));
if let &Err(ref e) = &res {
if let Err(e) = res {
debug!("Block header {} refused by chain: {:?}", bhash, e.kind());
if e.is_bad_data() {
return false;
return Ok(false);
} else {
// we got an error when trying to process the block header
// but nothing serious enough to need to ban the peer upstream
return true;
return Err(e);
}
}
@ -230,37 +244,43 @@ impl p2p::ChainAdapter for NetToChainAdapter {
self.request_compact_block(&bh, addr);
// done receiving the header
true
Ok(true)
}
fn headers_received(&self, bhs: &[core::BlockHeader], addr: PeerAddr) -> bool {
fn headers_received(
&self,
bhs: &[core::BlockHeader],
addr: PeerAddr,
) -> Result<bool, chain::Error> {
info!("Received {} block headers from {}", bhs.len(), addr,);
if bhs.len() == 0 {
return false;
return Ok(false);
}
// try to add headers to our header chain
let res = self.chain().sync_block_headers(bhs, self.chain_opts(true));
if let &Err(ref e) = &res {
match self.chain().sync_block_headers(bhs, self.chain_opts(true)) {
Ok(_) => Ok(true),
Err(e) => {
debug!("Block headers refused by chain: {:?}", e);
if e.is_bad_data() {
return false;
return Ok(false);
} else {
Err(e)
}
}
}
true
}
fn locate_headers(&self, locator: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
debug!("locator: {:?}", locator);
let header = match self.find_common_header(locator) {
Some(header) => header,
None => return vec![],
None => return Ok(vec![]),
};
let max_height = self.chain().header_head().unwrap().height;
let max_height = self.chain().header_head()?.height;
let txhashset = self.chain().txhashset();
let txhashset = txhashset.read();
@ -283,7 +303,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
debug!("returning headers: {}", headers.len());
headers
Ok(headers)
}
/// Gets a full block by its hash.
@ -348,11 +368,16 @@ impl p2p::ChainAdapter for NetToChainAdapter {
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes.
fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: PeerAddr) -> bool {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
_peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
// check status again after download, in case 2 txhashsets made it somehow
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else {
return true;
return Ok(true);
}
if let Err(e) = self
@ -361,12 +386,13 @@ impl p2p::ChainAdapter for NetToChainAdapter {
{
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: {}", e);
let is_good_data = !e.is_bad_data();
self.sync_state.set_sync_error(types::Error::Chain(e));
is_good_data
Ok(is_good_data)
} else {
info!("Received valid txhashset data for {}.", h);
true
Ok(true)
}
}
}
@ -428,15 +454,20 @@ impl NetToChainAdapter {
// pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool {
fn process_block(
&self,
b: core::Block,
addr: PeerAddr,
was_requested: bool,
) -> Result<bool, chain::Error> {
// We cannot process blocks earlier than the horizon so check for this here.
{
let head = self.chain().head().unwrap();
let head = self.chain().head()?;
let horizon = head
.height
.saturating_sub(global::cut_through_horizon() as u64);
if b.header.height < horizon {
return true;
return Ok(true);
}
}
@ -450,11 +481,11 @@ impl NetToChainAdapter {
Ok(_) => {
self.validate_chain(bhash);
self.check_compact();
true
Ok(true)
}
Err(ref e) if e.is_bad_data() => {
self.validate_chain(bhash);
false
Ok(false)
}
Err(e) => {
match e.kind() {
@ -468,7 +499,7 @@ impl NetToChainAdapter {
self.request_block_by_hash(previous.hash(), addr)
}
}
true
Ok(true)
}
_ => {
debug!(
@ -476,7 +507,7 @@ impl NetToChainAdapter {
bhash,
e.kind()
);
true
Ok(true)
}
}
}

View file

@ -129,8 +129,12 @@ pub fn connect_and_monitor(
if Utc::now() - prev_ping > Duration::seconds(10) {
let total_diff = peers.total_difficulty();
let total_height = peers.total_height();
peers.check_all(total_diff, total_height);
if total_diff.is_ok() && total_height.is_ok() {
peers.check_all(total_diff.unwrap(), total_height.unwrap());
prev_ping = Utc::now();
} else {
error!("failed to get peers difficulty and/or height");
}
}
thread::sleep(time::Duration::from_secs(1));

View file

@ -100,7 +100,7 @@ impl BodySync {
hashes.reverse();
let peers = self.peers.more_work_peers();
let peers = self.peers.more_work_peers()?;
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work

View file

@ -77,7 +77,7 @@ impl SyncRunner {
let mut n = 0;
const MIN_PEERS: usize = 3;
loop {
let wp = self.peers.more_or_same_work_peers();
let wp = self.peers.more_or_same_work_peers()?;
// exit loop when:
// * we have more than MIN_PEERS more_or_same_work peers
// * we are synced already, e.g. grin was quickly restarted