P2p read write (#1993)

Refactor some p2p to use Read/Write over TcpStream
This commit is contained in:
Ignotus Peverell 2018-11-16 09:57:16 -08:00 committed by GitHub
commit 5f67af52d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 41 deletions

View file

@ -39,6 +39,7 @@ pub trait MessageHandler: Send + 'static {
fn consume<'a>( fn consume<'a>(
&self, &self,
msg: Message<'a>, msg: Message<'a>,
writer: &'a mut Write,
received_bytes: Arc<RwLock<RateCounter>>, received_bytes: Arc<RwLock<RateCounter>>,
) -> Result<Option<Response<'a>>, Error>; ) -> Result<Option<Response<'a>>, Error>;
} }
@ -62,23 +63,23 @@ macro_rules! try_break {
/// header lazily consumes the message body, handling its deserialization. /// header lazily consumes the message body, handling its deserialization.
pub struct Message<'a> { pub struct Message<'a> {
pub header: MsgHeader, pub header: MsgHeader,
conn: &'a mut TcpStream, stream: &'a mut Read,
} }
impl<'a> Message<'a> { impl<'a> Message<'a> {
fn from_header(header: MsgHeader, conn: &'a mut TcpStream) -> Message<'a> { fn from_header(header: MsgHeader, stream: &'a mut Read) -> Message<'a> {
Message { header, conn } Message { header, stream }
} }
/// Read the message body from the underlying connection /// Read the message body from the underlying connection
pub fn body<T: ser::Readable>(&mut self) -> Result<T, Error> { pub fn body<T: ser::Readable>(&mut self) -> Result<T, Error> {
read_body(&self.header, self.conn) read_body(&self.header, self.stream)
} }
/// Read a single "thing" from the underlying connection. /// Read a single "thing" from the underlying connection.
/// Return the thing and the total bytes read. /// Return the thing and the total bytes read.
pub fn streaming_read<T: ser::Readable>(&mut self) -> Result<(T, u64), Error> { pub fn streaming_read<T: ser::Readable>(&mut self) -> Result<(T, u64), Error> {
read_item(self.conn) read_item(self.stream)
} }
pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<usize, Error> { pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<usize, Error> {
@ -87,7 +88,7 @@ impl<'a> Message<'a> {
let read_len = cmp::min(8000, len - written); let read_len = cmp::min(8000, len - written);
let mut buf = vec![0u8; read_len]; let mut buf = vec![0u8; read_len];
read_exact( read_exact(
&mut self.conn, &mut self.stream,
&mut buf[..], &mut buf[..],
time::Duration::from_secs(10), time::Duration::from_secs(10),
true, true,
@ -97,36 +98,32 @@ impl<'a> Message<'a> {
} }
Ok(written) Ok(written)
} }
/// Respond to the message with the provided message type and body
pub fn respond<T>(self, resp_type: Type, body: T) -> Response<'a>
where
T: ser::Writeable,
{
let body = ser::ser_vec(&body).unwrap();
Response {
resp_type: resp_type,
body: body,
conn: self.conn,
attachment: None,
}
}
} }
/// Response to a `Message` /// Response to a `Message`.
pub struct Response<'a> { pub struct Response<'a> {
resp_type: Type, resp_type: Type,
body: Vec<u8>, body: Vec<u8>,
conn: &'a mut TcpStream, stream: &'a mut Write,
attachment: Option<File>, attachment: Option<File>,
} }
impl<'a> Response<'a> { impl<'a> Response<'a> {
pub fn new<T: ser::Writeable>(resp_type: Type, body: T, stream: &'a mut Write) -> Response<'a> {
let body = ser::ser_vec(&body).unwrap();
Response {
resp_type,
body,
stream,
attachment: None,
}
}
fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> { fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
let mut msg = let mut msg =
ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap(); ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap();
msg.append(&mut self.body); msg.append(&mut self.body);
write_all(&mut self.conn, &msg[..], time::Duration::from_secs(10))?; write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
// Increase sent bytes counter // Increase sent bytes counter
{ {
let mut sent_bytes = sent_bytes.write(); let mut sent_bytes = sent_bytes.write();
@ -138,7 +135,7 @@ impl<'a> Response<'a> {
match file.read(&mut buf[..]) { match file.read(&mut buf[..]) {
Ok(0) => break, Ok(0) => break,
Ok(n) => { Ok(n) => {
write_all(&mut self.conn, &buf[..n], time::Duration::from_secs(10))?; write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?;
// Increase sent bytes "quietly" without incrementing the counter. // Increase sent bytes "quietly" without incrementing the counter.
// (In a loop here for the single attachment). // (In a loop here for the single attachment).
let mut sent_bytes = sent_bytes.write(); let mut sent_bytes = sent_bytes.write();
@ -237,18 +234,20 @@ fn poll<H>(
) where ) where
H: MessageHandler, H: MessageHandler,
{ {
let mut conn = conn; // Split out tcp stream out into separate reader/writer halves.
let mut reader = conn.try_clone().expect("clone conn for reader failed");
let mut writer = conn.try_clone().expect("clone conn for writer failed");
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("peer".to_string()) .name("peer".to_string())
.spawn(move || { .spawn(move || {
let sleep_time = time::Duration::from_millis(1); let sleep_time = time::Duration::from_millis(1);
let conn = &mut conn;
let mut retry_send = Err(()); let mut retry_send = Err(());
loop { loop {
// check the read end // check the read end
if let Some(h) = try_break!(error_tx, read_header(conn, None)) { if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) {
let msg = Message::from_header(h, conn); let msg = Message::from_header(h, &mut reader);
trace!( trace!(
"Received message header, type {:?}, len {}.", "Received message header, type {:?}, len {}.",
msg.header.msg_type, msg.header.msg_type,
@ -262,7 +261,9 @@ fn poll<H>(
received_bytes.inc(MsgHeader::LEN as u64 + msg.header.msg_len); received_bytes.inc(MsgHeader::LEN as u64 + msg.header.msg_len);
} }
if let Some(Some(resp)) = try_break!(error_tx, handler.consume(msg, received)) { if let Some(Some(resp)) =
try_break!(error_tx, handler.consume(msg, &mut writer, received))
{
try_break!(error_tx, resp.write(sent_bytes.clone())); try_break!(error_tx, resp.write(sent_bytes.clone()));
} }
} }
@ -272,7 +273,7 @@ fn poll<H>(
retry_send = Err(()); retry_send = Err(());
if let Ok(data) = maybe_data { if let Ok(data) = maybe_data {
let written = let written =
try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from)); try_break!(error_tx, writer.write_all(&data[..]).map_err(&From::from));
if written.is_none() { if written.is_none() {
retry_send = Ok(data); retry_send = Ok(data);
} }

View file

@ -15,7 +15,7 @@
use std::cmp; use std::cmp;
use std::env; use std::env;
use std::fs::File; use std::fs::File;
use std::io::BufWriter; use std::io::{BufWriter, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -45,6 +45,7 @@ impl MessageHandler for Protocol {
fn consume<'a>( fn consume<'a>(
&self, &self,
mut msg: Message<'a>, mut msg: Message<'a>,
writer: &'a mut Write,
received_bytes: Arc<RwLock<RateCounter>>, received_bytes: Arc<RwLock<RateCounter>>,
) -> Result<Option<Response<'a>>, Error> { ) -> Result<Option<Response<'a>>, Error> {
let adapter = &self.adapter; let adapter = &self.adapter;
@ -65,12 +66,13 @@ impl MessageHandler for Protocol {
let ping: Ping = msg.body()?; let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height); adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height);
Ok(Some(msg.respond( Ok(Some(Response::new(
Type::Pong, Type::Pong,
Pong { Pong {
total_difficulty: adapter.total_difficulty(), total_difficulty: adapter.total_difficulty(),
height: adapter.total_height(), height: adapter.total_height(),
}, },
writer,
))) )))
} }
@ -104,7 +106,7 @@ impl MessageHandler for Protocol {
); );
let tx = adapter.get_transaction(h); let tx = adapter.get_transaction(h);
if let Some(tx) = tx { if let Some(tx) = tx {
Ok(Some(msg.respond(Type::Transaction, tx))) Ok(Some(Response::new(Type::Transaction, tx, writer)))
} else { } else {
Ok(None) Ok(None)
} }
@ -140,7 +142,7 @@ impl MessageHandler for Protocol {
let bo = adapter.get_block(h); let bo = adapter.get_block(h);
if let Some(b) = bo { if let Some(b) = bo {
return Ok(Some(msg.respond(Type::Block, b))); return Ok(Some(Response::new(Type::Block, b, writer)));
} }
Ok(None) Ok(None)
} }
@ -160,7 +162,7 @@ impl MessageHandler for Protocol {
let h: Hash = msg.body()?; let h: Hash = msg.body()?;
if let Some(b) = adapter.get_block(h) { if let Some(b) = adapter.get_block(h) {
let cb: CompactBlock = b.into(); let cb: CompactBlock = b.into();
Ok(Some(msg.respond(Type::CompactBlock, cb))) Ok(Some(Response::new(Type::CompactBlock, cb, writer)))
} else { } else {
Ok(None) Ok(None)
} }
@ -183,9 +185,11 @@ impl MessageHandler for Protocol {
let headers = adapter.locate_headers(loc.hashes); let headers = adapter.locate_headers(loc.hashes);
// serialize and send all the headers over // serialize and send all the headers over
Ok(Some( Ok(Some(Response::new(
msg.respond(Type::Headers, Headers { headers: headers }), Type::Headers,
)) Headers { headers },
writer,
)))
} }
// "header first" block propagation - if we have not yet seen this block // "header first" block propagation - if we have not yet seen this block
@ -226,11 +230,12 @@ impl MessageHandler for Protocol {
Type::GetPeerAddrs => { Type::GetPeerAddrs => {
let get_peers: GetPeerAddrs = msg.body()?; let get_peers: GetPeerAddrs = msg.body()?;
let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities); let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities);
Ok(Some(msg.respond( Ok(Some(Response::new(
Type::PeerAddrs, Type::PeerAddrs,
PeerAddrs { PeerAddrs {
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
}, },
writer,
))) )))
} }
@ -251,13 +256,14 @@ impl MessageHandler for Protocol {
if let Some(txhashset) = txhashset { if let Some(txhashset) = txhashset {
let file_sz = txhashset.reader.metadata()?.len(); let file_sz = txhashset.reader.metadata()?.len();
let mut resp = msg.respond( let mut resp = Response::new(
Type::TxHashSetArchive, Type::TxHashSetArchive,
&TxHashSetArchive { &TxHashSetArchive {
height: sm_req.height as u64, height: sm_req.height as u64,
hash: sm_req.hash, hash: sm_req.hash,
bytes: file_sz, bytes: file_sz,
}, },
writer,
); );
resp.add_attachment(txhashset.reader); resp.add_attachment(txhashset.reader);
Ok(Some(resp)) Ok(Some(resp))