P2p ping and partial header reads fixes ()

* Re-introduce peer regular ping/pong
* Fix partial header reads. Turns out that on async tcp streams, even a small chunk like a
header can be read partially. So header read needs to rely on our
fixed-up `read_exact`, with an additional boolean to allow yield
when no bytes were read.
This commit is contained in:
Ignotus Peverell 2018-02-05 19:52:11 +00:00 committed by GitHub
parent fb46fad0ac
commit a9f4f36117
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 10 deletions

View file

@ -42,7 +42,6 @@ macro_rules! try_break {
match $inner {
Ok(v) => Some(v),
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => {
//println!("++ not ready");
None
}
Err(e) => {

View file

@ -69,20 +69,42 @@ enum_from_primitive! {
}
/// The default implementation of read_exact is useless with async TcpStream as
/// will return as soon as something has been read, regardless of completeness.
/// This implementation will block until it has read exactly `len` bytes and
/// returns them as a `vec<u8>`. Additionally, a timeout in milliseconds will
/// abort the read when it's met. Note that the timeout time is approximate.
pub fn read_exact(conn: &mut TcpStream, mut buf: &mut [u8], timeout: u32) -> io::Result<()> {
/// it will return as soon as something has been read, regardless of
/// whether the buffer has been filled (and then errors). This implementation
/// will block until it has read exactly `len` bytes and returns them as a
/// `vec<u8>`. Except for a timeout, this implementation will never return a
/// partially filled buffer.
///
/// The timeout in milliseconds aborts the read when it's met. Note that the
/// time is not guaranteed to be exact. To support cases where we want to poll
/// instead of blocking, a `block_on_empty` boolean, when false, ensures
/// `read_exact` returns early with a `io::ErrorKind::WouldBlock` if nothing
/// has been read from the socket.
pub fn read_exact(
conn: &mut TcpStream,
mut buf: &mut [u8],
timeout: u32,
block_on_empty: bool,
) -> io::Result<()> {
let sleep_time = time::Duration::from_millis(1);
let mut count = 0;
let mut read = 0;
loop {
match conn.read(buf) {
Ok(0) => break,
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
read += n;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if read == 0 && !block_on_empty {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "read_exact"));
}
}
Err(e) => return Err(e),
}
if !buf.is_empty() {
@ -104,7 +126,7 @@ pub fn read_exact(conn: &mut TcpStream, mut buf: &mut [u8], timeout: u32) -> io:
pub fn read_header(conn: &mut TcpStream) -> Result<MsgHeader, Error> {
let mut head = vec![0u8; HEADER_LEN as usize];
conn.read_exact(&mut head)?;
read_exact(conn, &mut head, 10000, false)?;
let header = ser::deserialize::<MsgHeader>(&mut &head[..])?;
if header.msg_len > MAX_MSG_LEN {
// TODO additional restrictions for each msg type to avoid 20MB pings...
@ -120,7 +142,7 @@ where
T: Readable,
{
let mut body = vec![0u8; h.msg_len as usize];
read_exact(conn, &mut body, 15000)?;
read_exact(conn, &mut body, 20000, true)?;
ser::deserialize(&mut &body[..]).map_err(From::from)
}

View file

@ -14,6 +14,7 @@
use std::sync::{Arc, RwLock};
use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
use std::thread;
use std::time::Duration;
use core::core;
@ -49,6 +50,7 @@ impl Server {
adapter: Arc<ChainAdapter>,
genesis: Hash,
) -> Result<Server, Error> {
Ok(Server {
config: config.clone(),
capabilities: capab,
@ -57,7 +59,19 @@ impl Server {
})
}
/// Starts a new TCP server and listen to incoming connections. This is a
/// blocking call until the TCP server stops.
pub fn listen(&self) -> Result<(), Error> {
// start peer monitoring thread
let peers_inner = self.peers.clone();
let _ = thread::Builder::new().name("p2p-monitor".to_string()).spawn(move || {
let total_diff = peers_inner.total_difficulty();
let total_height = peers_inner.total_height();
peers_inner.check_all(total_diff, total_height);
thread::sleep(Duration::from_secs(20));
});
// start TCP listener and handle incoming connections
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = TcpListener::bind(addr)?;

View file

@ -14,6 +14,7 @@
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate grin_util as util;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
@ -36,6 +37,8 @@ fn open_port() -> u16 {
// followed by a ping/pong exchange to make sure the connection is live.
#[test]
fn peer_handshake() {
util::init_test_logger();
let p2p_conf = p2p::P2PConfig {
host: "0.0.0.0".parse().unwrap(),
port: open_port(),