diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index d1d5d8062..06a8a206a 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -113,19 +113,18 @@ impl<'a> Response<'a> { resp_type: Type, body: T, stream: &'a mut dyn Write, - ) -> Response<'a> { - let body = ser::ser_vec(&body).unwrap(); - Response { + ) -> Result, Error> { + let body = ser::ser_vec(&body)?; + Ok(Response { resp_type, body, stream, attachment: None, - } + }) } fn write(mut self, sent_bytes: Arc>) -> Result<(), Error> { - let mut msg = - ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap(); + let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?; msg.append(&mut self.body); write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?; // Increase sent bytes counter @@ -177,7 +176,7 @@ impl Tracker { where T: ser::Writeable, { - let buf = write_to_buf(body, msg_type); + let buf = write_to_buf(body, msg_type)?; let buf_len = buf.len(); self.send_channel.try_send(buf)?; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 8e7b5d151..16fb81c28 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -160,18 +160,18 @@ pub fn read_message(stream: &mut dyn Read, msg_type: Type) -> Resul read_body(&header, stream) } -pub fn write_to_buf(msg: T, msg_type: Type) -> Vec { +pub fn write_to_buf(msg: T, msg_type: Type) -> Result, Error> { // prepare the body first so we know its serialized length let mut body_buf = vec![]; - ser::serialize(&mut body_buf, &msg).unwrap(); + ser::serialize(&mut body_buf, &msg)?; // build and serialize the header using the body size let mut msg_buf = vec![]; let blen = body_buf.len() as u64; - ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen)).unwrap(); + ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen))?; msg_buf.append(&mut body_buf); - msg_buf + Ok(msg_buf) } pub fn write_message( @@ -179,7 +179,7 @@ pub fn write_message( msg: T, msg_type: Type, ) -> Result<(), Error> { - let buf = write_to_buf(msg, msg_type); + let buf = write_to_buf(msg, msg_type)?; stream.write_all(&buf[..])?; Ok(()) } @@ -268,11 +268,11 @@ impl Writeable for Hand { [write_u32, self.capabilities.bits()], [write_u64, self.nonce] ); - self.total_difficulty.write(writer).unwrap(); - self.sender_addr.write(writer).unwrap(); - self.receiver_addr.write(writer).unwrap(); - writer.write_bytes(&self.user_agent).unwrap(); - self.genesis.write(writer).unwrap(); + self.total_difficulty.write(writer)?; + self.sender_addr.write(writer)?; + self.receiver_addr.write(writer)?; + writer.write_bytes(&self.user_agent)?; + self.genesis.write(writer)?; Ok(()) } } @@ -323,9 +323,9 @@ impl Writeable for Shake { [write_u32, self.version], [write_u32, self.capabilities.bits()] ); - self.total_difficulty.write(writer).unwrap(); - writer.write_bytes(&self.user_agent).unwrap(); - self.genesis.write(writer).unwrap(); + self.total_difficulty.write(writer)?; + writer.write_bytes(&self.user_agent)?; + self.genesis.write(writer)?; Ok(()) } } @@ -379,7 +379,7 @@ impl Writeable for PeerAddrs { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { writer.write_u32(self.peers.len() as u32)?; for p in &self.peers { - p.write(writer).unwrap(); + p.write(writer)?; } Ok(()) } @@ -484,8 +484,8 @@ pub struct Ping { impl Writeable for Ping { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - self.total_difficulty.write(writer).unwrap(); - self.height.write(writer).unwrap(); + self.total_difficulty.write(writer)?; + self.height.write(writer)?; Ok(()) } } @@ -511,8 +511,8 @@ pub struct Pong { impl Writeable for Pong { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { - self.total_difficulty.write(writer).unwrap(); - self.height.write(writer).unwrap(); + self.total_difficulty.write(writer)?; + self.height.write(writer)?; Ok(()) } } @@ -537,7 +537,7 @@ pub struct BanReason { impl Writeable for BanReason { fn write(&self, writer: &mut W) -> Result<(), ser::Error> { let ban_reason_i32 = self.ban_reason as i32; - ban_reason_i32.write(writer).unwrap(); + ban_reason_i32.write(writer)?; Ok(()) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1b50bf59a..72c66ec56 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -54,6 +54,15 @@ pub struct Peer { connection: Option>, } +macro_rules! connection { + ($holder:expr) => { + match $holder.connection.as_ref() { + Some(conn) => conn.lock(), + None => return Err(Error::Internal), + } + }; +} + impl Peer { // Only accept and connect can be externally used to build a peer fn new(info: PeerInfo, adapter: Arc) -> Peer { @@ -233,29 +242,15 @@ impl Peer { total_difficulty, height, }; - self.connection - .as_ref() - .unwrap() - .lock() - .send(ping_msg, msg::Type::Ping) + connection!(self).send(ping_msg, msg::Type::Ping) } /// Send the ban reason before banning - pub fn send_ban_reason(&self, ban_reason: ReasonForBan) { + pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> { let ban_reason_msg = BanReason { ban_reason }; - match self - .connection - .as_ref() - .unwrap() - .lock() + connection!(self) .send(ban_reason_msg, msg::Type::BanReason) - { - Ok(_) => debug!("Sent ban reason {:?} to {}", ban_reason, self.info.addr), - Err(e) => error!( - "Could not send ban reason {:?} to {}: {:?}", - ban_reason, self.info.addr, e - ), - }; + .map(|_| ()) } /// Sends the provided block to the remote peer. The request may be dropped @@ -263,11 +258,7 @@ impl Peer { pub fn send_block(&self, b: &core::Block) -> Result { if !self.tracking_adapter.has_recv(b.hash()) { trace!("Send block {} to {}", b.hash(), self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(b, msg::Type::Block)?; + connection!(self).send(b, msg::Type::Block)?; Ok(true) } else { debug!( @@ -282,11 +273,7 @@ impl Peer { pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result { if !self.tracking_adapter.has_recv(b.hash()) { trace!("Send compact block {} to {}", b.hash(), self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(b, msg::Type::CompactBlock)?; + connection!(self).send(b, msg::Type::CompactBlock)?; Ok(true) } else { debug!( @@ -301,11 +288,7 @@ impl Peer { pub fn send_header(&self, bh: &core::BlockHeader) -> Result { if !self.tracking_adapter.has_recv(bh.hash()) { debug!("Send header {} to {}", bh.hash(), self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(bh, msg::Type::Header)?; + connection!(self).send(bh, msg::Type::Header)?; Ok(true) } else { debug!( @@ -320,11 +303,7 @@ impl Peer { pub fn send_tx_kernel_hash(&self, h: Hash) -> Result { if !self.tracking_adapter.has_recv(h) { debug!("Send tx kernel hash {} to {}", h, self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(h, msg::Type::TransactionKernel)?; + connection!(self).send(h, msg::Type::TransactionKernel)?; Ok(true) } else { debug!( @@ -352,11 +331,7 @@ impl Peer { if !self.tracking_adapter.has_recv(kernel.hash()) { debug!("Send full tx {} to {}", tx.hash(), self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(tx, msg::Type::Transaction)?; + connection!(self).send(tx, msg::Type::Transaction)?; Ok(true) } else { debug!( @@ -373,21 +348,12 @@ impl Peer { /// embargo). pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(tx, msg::Type::StemTransaction)?; - Ok(()) + connection!(self).send(tx, msg::Type::StemTransaction) } /// Sends a request for block headers from the provided block locator pub fn send_header_request(&self, locator: Vec) -> Result<(), Error> { - self.connection - .as_ref() - .unwrap() - .lock() - .send(&Locator { hashes: locator }, msg::Type::GetHeaders) + connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders) } pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { @@ -395,37 +361,25 @@ impl Peer { "Requesting tx (kernel hash) {} from peer {}.", h, self.info.addr ); - self.connection - .as_ref() - .unwrap() - .lock() - .send(&h, msg::Type::GetTransaction) + connection!(self).send(&h, msg::Type::GetTransaction) } /// Sends a request for a specific block by hash pub fn send_block_request(&self, h: Hash) -> Result<(), Error> { debug!("Requesting block {} from peer {}.", h, self.info.addr); self.tracking_adapter.push_req(h); - self.connection - .as_ref() - .unwrap() - .lock() - .send(&h, msg::Type::GetBlock) + connection!(self).send(&h, msg::Type::GetBlock) } /// Sends a request for a specific compact block by hash pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> { debug!("Requesting compact block {} from {}", h, self.info.addr); - self.connection - .as_ref() - .unwrap() - .lock() - .send(&h, msg::Type::GetCompactBlock) + connection!(self).send(&h, msg::Type::GetCompactBlock) } pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> { trace!("Asking {} for more peers {:?}", self.info.addr, capab); - self.connection.as_ref().unwrap().lock().send( + connection!(self).send( &GetPeerAddrs { capabilities: capab, }, @@ -438,7 +392,7 @@ impl Peer { "Asking {} for txhashset archive at {} {}.", self.info.addr, height, hash ); - self.connection.as_ref().unwrap().lock().send( + connection!(self).send( &TxHashSetRequest { hash, height }, msg::Type::TxHashSetRequest, ) @@ -446,11 +400,16 @@ impl Peer { /// Stops the peer, closing its connection pub fn stop(&self) { - stop_with_connection(&self.connection.as_ref().unwrap().lock()); + if let Some(conn) = self.connection.as_ref() { + stop_with_connection(&conn.lock()); + } } fn check_connection(&self) -> bool { - let connection = self.connection.as_ref().unwrap().lock(); + let connection = match self.connection.as_ref() { + Some(conn) => conn.lock(), + None => return false, + }; match connection.error_channel.try_recv() { Ok(Error::Serialization(e)) => { let need_stop = { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 19713b7d8..06865e37b 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -217,11 +217,10 @@ impl Peers { return vec![]; } - let max_total_difficulty = peers - .iter() - .map(|x| x.info.total_difficulty()) - .max() - .unwrap(); + let max_total_difficulty = match peers.iter().map(|x| x.info.total_difficulty()).max() { + Some(v) => v, + None => return vec![], + }; let mut max_peers = peers .into_iter() @@ -256,7 +255,10 @@ impl Peers { if let Some(peer) = self.get_connected_peer(peer_addr) { debug!("Banning peer {}", peer_addr); // setting peer status will get it removed at the next clean_peer - peer.send_ban_reason(ban_reason); + match peer.send_ban_reason(ban_reason) { + Err(e) => error!("failed to send a ban reason to{}: {:?}", peer_addr, e), + Ok(_) => debug!("ban reason {:?} was sent to {}", ban_reason, peer_addr), + }; peer.set_banned(); peer.stop(); } @@ -383,12 +385,24 @@ impl Peers { /// All peer information we have in storage pub fn all_peers(&self) -> Vec { - self.store.all_peers() + match self.store.all_peers() { + Ok(peers) => peers, + Err(e) => { + error!("all_peers failed: {:?}", e); + vec![] + } + } } /// Find peers in store (not necessarily connected) and return their data pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - self.store.find_peers(state, cap, count) + match self.store.find_peers(state, cap, count) { + Ok(peers) => peers, + Err(e) => { + error!("failed to find peers: {:?}", e); + vec![] + } + } } /// Get peer in store by address @@ -428,11 +442,12 @@ impl Peers { debug!("clean_peers {:?}, not connected", peer.info.addr); rm.push(peer.info.addr.clone()); } else if peer.is_abusive() { - let counts = peer.last_min_message_counts().unwrap(); - debug!( - "clean_peers {:?}, abusive ({} sent, {} recv)", - peer.info.addr, counts.0, counts.1, - ); + if let Some(counts) = peer.last_min_message_counts() { + debug!( + "clean_peers {:?}, abusive ({} sent, {} recv)", + peer.info.addr, counts.0, counts.1, + ); + } let _ = self.update_state(peer.info.addr, State::Banned); rm.push(peer.info.addr.clone()); } else { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 38df07c75..cc49abd3f 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -72,7 +72,7 @@ impl MessageHandler for Protocol { height: adapter.total_height(), }, writer, - ))) + )?)) } Type::Pong => { @@ -105,7 +105,7 @@ impl MessageHandler for Protocol { ); let tx = adapter.get_transaction(h); if let Some(tx) = tx { - Ok(Some(Response::new(Type::Transaction, tx, writer))) + Ok(Some(Response::new(Type::Transaction, tx, writer)?)) } else { Ok(None) } @@ -141,7 +141,7 @@ impl MessageHandler for Protocol { let bo = adapter.get_block(h); if let Some(b) = bo { - return Ok(Some(Response::new(Type::Block, b, writer))); + return Ok(Some(Response::new(Type::Block, b, writer)?)); } Ok(None) } @@ -163,7 +163,7 @@ impl MessageHandler for Protocol { let h: Hash = msg.body()?; if let Some(b) = adapter.get_block(h) { let cb: CompactBlock = b.into(); - Ok(Some(Response::new(Type::CompactBlock, cb, writer))) + Ok(Some(Response::new(Type::CompactBlock, cb, writer)?)) } else { Ok(None) } @@ -190,7 +190,7 @@ impl MessageHandler for Protocol { Type::Headers, Headers { headers }, writer, - ))) + )?)) } // "header first" block propagation - if we have not yet seen this block @@ -235,7 +235,7 @@ impl MessageHandler for Protocol { Type::PeerAddrs, PeerAddrs { peers }, writer, - ))) + )?)) } Type::PeerAddrs => { @@ -263,7 +263,7 @@ impl MessageHandler for Protocol { bytes: file_sz, }, writer, - ); + )?; resp.add_attachment(txhashset.reader); Ok(Some(resp)) } else { @@ -312,7 +312,10 @@ impl MessageHandler for Protocol { received_bytes.inc_quiet(size as u64); } } - tmp_zip.into_inner().unwrap().sync_all()?; + tmp_zip + .into_inner() + .map_err(|_| Error::Internal)? + .sync_all()?; Ok(()) }; diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 074cc066f..dc857f189 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -85,10 +85,9 @@ impl Readable for PeerData { let lc = reader.read_i64(); // this only works because each PeerData is read in its own vector and this // is the last data element - let last_connected = if let Err(_) = lc { - Utc::now().timestamp() - } else { - lc.unwrap() + let last_connected = match lc { + Err(_) => Utc::now().timestamp(), + Ok(lc) => lc, }; let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; @@ -149,22 +148,26 @@ impl PeerStore { batch.commit() } - pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { + pub fn find_peers( + &self, + state: State, + cap: Capabilities, + count: usize, + ) -> Result, Error> { let mut peers = self .db - .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())) - .unwrap() + .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()))? .filter(|p| p.flags == state && p.capabilities.contains(cap)) .collect::>(); thread_rng().shuffle(&mut peers[..]); - peers.iter().take(count).cloned().collect() + Ok(peers.iter().take(count).cloned().collect()) } /// List all known peers /// Used for /v1/peers/all api endpoint - pub fn all_peers(&self) -> Vec { + pub fn all_peers(&self) -> Result, Error> { let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes()); - self.db.iter::(&key).unwrap().collect::>() + Ok(self.db.iter::(&key)?.collect::>()) } /// Convenience method to load a peer data, update its status and save it @@ -192,7 +195,7 @@ impl PeerStore { { let mut to_remove = vec![]; - for x in self.all_peers() { + for x in self.all_peers()? { if predicate(&x) { to_remove.push(x) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index c6cffefdd..48f308dc9 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -75,6 +75,7 @@ pub enum Error { }, Send(String), PeerException, + Internal, } impl From for Error { diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index 9cc5b32ef..be50a2539 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -166,7 +166,7 @@ impl Store { /// provided key. pub fn iter(&self, from: &[u8]) -> Result, Error> { let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?); - let cursor = Arc::new(tx.cursor(self.db.clone()).unwrap()); + let cursor = Arc::new(tx.cursor(self.db.clone())?); Ok(SerIterator { tx, cursor,