Reduce usage of unwrap in p2p crate (#2627)

Also change store crate a bit
This commit is contained in:
hashmap 2019-02-25 19:48:54 +01:00 committed by GitHub
parent 224a315dd1
commit fe9fa51f32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 112 additions and 132 deletions

View file

@ -113,19 +113,18 @@ impl<'a> Response<'a> {
resp_type: Type,
body: T,
stream: &'a mut dyn Write,
) -> Response<'a> {
let body = ser::ser_vec(&body).unwrap();
Response {
) -> Result<Response<'a>, Error> {
let body = ser::ser_vec(&body)?;
Ok(Response {
resp_type,
body,
stream,
attachment: None,
}
})
}
fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
let mut msg =
ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap();
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?;
msg.append(&mut self.body);
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
// Increase sent bytes counter
@ -177,7 +176,7 @@ impl Tracker {
where
T: ser::Writeable,
{
let buf = write_to_buf(body, msg_type);
let buf = write_to_buf(body, msg_type)?;
let buf_len = buf.len();
self.send_channel.try_send(buf)?;

View file

@ -160,18 +160,18 @@ pub fn read_message<T: Readable>(stream: &mut dyn Read, msg_type: Type) -> Resul
read_body(&header, stream)
}
pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Vec<u8> {
pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Result<Vec<u8>, Error> {
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg).unwrap();
ser::serialize(&mut body_buf, &msg)?;
// build and serialize the header using the body size
let mut msg_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen)).unwrap();
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen))?;
msg_buf.append(&mut body_buf);
msg_buf
Ok(msg_buf)
}
pub fn write_message<T: Writeable>(
@ -179,7 +179,7 @@ pub fn write_message<T: Writeable>(
msg: T,
msg_type: Type,
) -> Result<(), Error> {
let buf = write_to_buf(msg, msg_type);
let buf = write_to_buf(msg, msg_type)?;
stream.write_all(&buf[..])?;
Ok(())
}
@ -268,11 +268,11 @@ impl Writeable for Hand {
[write_u32, self.capabilities.bits()],
[write_u64, self.nonce]
);
self.total_difficulty.write(writer).unwrap();
self.sender_addr.write(writer).unwrap();
self.receiver_addr.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.sender_addr.write(writer)?;
self.receiver_addr.write(writer)?;
writer.write_bytes(&self.user_agent)?;
self.genesis.write(writer)?;
Ok(())
}
}
@ -323,9 +323,9 @@ impl Writeable for Shake {
[write_u32, self.version],
[write_u32, self.capabilities.bits()]
);
self.total_difficulty.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
self.total_difficulty.write(writer)?;
writer.write_bytes(&self.user_agent)?;
self.genesis.write(writer)?;
Ok(())
}
}
@ -379,7 +379,7 @@ impl Writeable for PeerAddrs {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u32(self.peers.len() as u32)?;
for p in &self.peers {
p.write(writer).unwrap();
p.write(writer)?;
}
Ok(())
}
@ -484,8 +484,8 @@ pub struct Ping {
impl Writeable for Ping {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
self.height.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.height.write(writer)?;
Ok(())
}
}
@ -511,8 +511,8 @@ pub struct Pong {
impl Writeable for Pong {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
self.height.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.height.write(writer)?;
Ok(())
}
}
@ -537,7 +537,7 @@ pub struct BanReason {
impl Writeable for BanReason {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
let ban_reason_i32 = self.ban_reason as i32;
ban_reason_i32.write(writer).unwrap();
ban_reason_i32.write(writer)?;
Ok(())
}
}

View file

@ -54,6 +54,15 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}
macro_rules! connection {
($holder:expr) => {
match $holder.connection.as_ref() {
Some(conn) => conn.lock(),
None => return Err(Error::Internal),
}
};
}
impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
@ -233,29 +242,15 @@ impl Peer {
total_difficulty,
height,
};
self.connection
.as_ref()
.unwrap()
.lock()
.send(ping_msg, msg::Type::Ping)
connection!(self).send(ping_msg, msg::Type::Ping)
}
/// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) {
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
match self
.connection
.as_ref()
.unwrap()
.lock()
connection!(self)
.send(ban_reason_msg, msg::Type::BanReason)
{
Ok(_) => debug!("Sent ban reason {:?} to {}", ban_reason, self.info.addr),
Err(e) => error!(
"Could not send ban reason {:?} to {}: {:?}",
ban_reason, self.info.addr, e
),
};
.map(|_| ())
}
/// Sends the provided block to the remote peer. The request may be dropped
@ -263,11 +258,7 @@ impl Peer {
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::Block)?;
connection!(self).send(b, msg::Type::Block)?;
Ok(true)
} else {
debug!(
@ -282,11 +273,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::CompactBlock)?;
connection!(self).send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
@ -301,11 +288,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(bh, msg::Type::Header)?;
connection!(self).send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
@ -320,11 +303,7 @@ impl Peer {
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(h, msg::Type::TransactionKernel)?;
connection!(self).send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
@ -352,11 +331,7 @@ impl Peer {
if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::Transaction)?;
connection!(self).send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
debug!(
@ -373,21 +348,12 @@ impl Peer {
/// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::StemTransaction)?;
Ok(())
connection!(self).send(tx, msg::Type::StemTransaction)
}
/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.connection
.as_ref()
.unwrap()
.lock()
.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}
pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
@ -395,37 +361,25 @@ impl Peer {
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetTransaction)
connection!(self).send(&h, msg::Type::GetTransaction)
}
/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetBlock)
connection!(self).send(&h, msg::Type::GetBlock)
}
/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetCompactBlock)
connection!(self).send(&h, msg::Type::GetCompactBlock)
}
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
self.connection.as_ref().unwrap().lock().send(
connection!(self).send(
&GetPeerAddrs {
capabilities: capab,
},
@ -438,7 +392,7 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
self.connection.as_ref().unwrap().lock().send(
connection!(self).send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
)
@ -446,11 +400,16 @@ impl Peer {
/// Stops the peer, closing its connection
pub fn stop(&self) {
stop_with_connection(&self.connection.as_ref().unwrap().lock());
if let Some(conn) = self.connection.as_ref() {
stop_with_connection(&conn.lock());
}
}
fn check_connection(&self) -> bool {
let connection = self.connection.as_ref().unwrap().lock();
let connection = match self.connection.as_ref() {
Some(conn) => conn.lock(),
None => return false,
};
match connection.error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let need_stop = {

View file

@ -217,11 +217,10 @@ impl Peers {
return vec![];
}
let max_total_difficulty = peers
.iter()
.map(|x| x.info.total_difficulty())
.max()
.unwrap();
let max_total_difficulty = match peers.iter().map(|x| x.info.total_difficulty()).max() {
Some(v) => v,
None => return vec![],
};
let mut max_peers = peers
.into_iter()
@ -256,7 +255,10 @@ impl Peers {
if let Some(peer) = self.get_connected_peer(peer_addr) {
debug!("Banning peer {}", peer_addr);
// setting peer status will get it removed at the next clean_peer
peer.send_ban_reason(ban_reason);
match peer.send_ban_reason(ban_reason) {
Err(e) => error!("failed to send a ban reason to{}: {:?}", peer_addr, e),
Ok(_) => debug!("ban reason {:?} was sent to {}", ban_reason, peer_addr),
};
peer.set_banned();
peer.stop();
}
@ -383,12 +385,24 @@ impl Peers {
/// All peer information we have in storage
pub fn all_peers(&self) -> Vec<PeerData> {
self.store.all_peers()
match self.store.all_peers() {
Ok(peers) => peers,
Err(e) => {
error!("all_peers failed: {:?}", e);
vec![]
}
}
}
/// Find peers in store (not necessarily connected) and return their data
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
self.store.find_peers(state, cap, count)
match self.store.find_peers(state, cap, count) {
Ok(peers) => peers,
Err(e) => {
error!("failed to find peers: {:?}", e);
vec![]
}
}
}
/// Get peer in store by address
@ -428,11 +442,12 @@ impl Peers {
debug!("clean_peers {:?}, not connected", peer.info.addr);
rm.push(peer.info.addr.clone());
} else if peer.is_abusive() {
let counts = peer.last_min_message_counts().unwrap();
if let Some(counts) = peer.last_min_message_counts() {
debug!(
"clean_peers {:?}, abusive ({} sent, {} recv)",
peer.info.addr, counts.0, counts.1,
);
}
let _ = self.update_state(peer.info.addr, State::Banned);
rm.push(peer.info.addr.clone());
} else {

View file

@ -72,7 +72,7 @@ impl MessageHandler for Protocol {
height: adapter.total_height(),
},
writer,
)))
)?))
}
Type::Pong => {
@ -105,7 +105,7 @@ impl MessageHandler for Protocol {
);
let tx = adapter.get_transaction(h);
if let Some(tx) = tx {
Ok(Some(Response::new(Type::Transaction, tx, writer)))
Ok(Some(Response::new(Type::Transaction, tx, writer)?))
} else {
Ok(None)
}
@ -141,7 +141,7 @@ impl MessageHandler for Protocol {
let bo = adapter.get_block(h);
if let Some(b) = bo {
return Ok(Some(Response::new(Type::Block, b, writer)));
return Ok(Some(Response::new(Type::Block, b, writer)?));
}
Ok(None)
}
@ -163,7 +163,7 @@ impl MessageHandler for Protocol {
let h: Hash = msg.body()?;
if let Some(b) = adapter.get_block(h) {
let cb: CompactBlock = b.into();
Ok(Some(Response::new(Type::CompactBlock, cb, writer)))
Ok(Some(Response::new(Type::CompactBlock, cb, writer)?))
} else {
Ok(None)
}
@ -190,7 +190,7 @@ impl MessageHandler for Protocol {
Type::Headers,
Headers { headers },
writer,
)))
)?))
}
// "header first" block propagation - if we have not yet seen this block
@ -235,7 +235,7 @@ impl MessageHandler for Protocol {
Type::PeerAddrs,
PeerAddrs { peers },
writer,
)))
)?))
}
Type::PeerAddrs => {
@ -263,7 +263,7 @@ impl MessageHandler for Protocol {
bytes: file_sz,
},
writer,
);
)?;
resp.add_attachment(txhashset.reader);
Ok(Some(resp))
} else {
@ -312,7 +312,10 @@ impl MessageHandler for Protocol {
received_bytes.inc_quiet(size as u64);
}
}
tmp_zip.into_inner().unwrap().sync_all()?;
tmp_zip
.into_inner()
.map_err(|_| Error::Internal)?
.sync_all()?;
Ok(())
};

View file

@ -85,10 +85,9 @@ impl Readable for PeerData {
let lc = reader.read_i64();
// this only works because each PeerData is read in its own vector and this
// is the last data element
let last_connected = if let Err(_) = lc {
Utc::now().timestamp()
} else {
lc.unwrap()
let last_connected = match lc {
Err(_) => Utc::now().timestamp(),
Ok(lc) => lc,
};
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
@ -149,22 +148,26 @@ impl PeerStore {
batch.commit()
}
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
pub fn find_peers(
&self,
state: State,
cap: Capabilities,
count: usize,
) -> Result<Vec<PeerData>, Error> {
let mut peers = self
.db
.iter::<PeerData>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()))
.unwrap()
.iter::<PeerData>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()))?
.filter(|p| p.flags == state && p.capabilities.contains(cap))
.collect::<Vec<_>>();
thread_rng().shuffle(&mut peers[..]);
peers.iter().take(count).cloned().collect()
Ok(peers.iter().take(count).cloned().collect())
}
/// List all known peers
/// Used for /v1/peers/all api endpoint
pub fn all_peers(&self) -> Vec<PeerData> {
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
let key = to_key(PEER_PREFIX, &mut "".to_string().into_bytes());
self.db.iter::<PeerData>(&key).unwrap().collect::<Vec<_>>()
Ok(self.db.iter::<PeerData>(&key)?.collect::<Vec<_>>())
}
/// Convenience method to load a peer data, update its status and save it
@ -192,7 +195,7 @@ impl PeerStore {
{
let mut to_remove = vec![];
for x in self.all_peers() {
for x in self.all_peers()? {
if predicate(&x) {
to_remove.push(x)
}

View file

@ -75,6 +75,7 @@ pub enum Error {
},
Send(String),
PeerException,
Internal,
}
impl From<ser::Error> for Error {

View file

@ -166,7 +166,7 @@ impl Store {
/// provided key.
pub fn iter<T: ser::Readable>(&self, from: &[u8]) -> Result<SerIterator<T>, Error> {
let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?);
let cursor = Arc::new(tx.cursor(self.db.clone()).unwrap());
let cursor = Arc::new(tx.cursor(self.db.clone())?);
Ok(SerIterator {
tx,
cursor,