From bffd955c2684626f2627c1aea10b42ae9aa8b2b4 Mon Sep 17 00:00:00 2001 From: AntiochP <30642645+antiochp@users.noreply.github.com> Date: Wed, 13 Dec 2017 16:30:59 -0500 Subject: [PATCH] Revert "introduce cpu_pool to read_msg (#477)" (#478) This reverts commit 45b996210997b0d84840b2891c8bdc64edd5e5ed. --- grin/src/adapters.rs | 8 ----- p2p/Cargo.toml | 1 - p2p/src/conn.rs | 59 +++++++++++++------------------------ p2p/src/lib.rs | 2 -- p2p/src/peer.rs | 5 ---- p2p/src/protocol.rs | 24 ++++----------- p2p/src/server.rs | 17 +---------- p2p/src/types.rs | 4 --- p2p/tests/peer_handshake.rs | 3 +- 9 files changed, 28 insertions(+), 95 deletions(-) diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index cc036fb50..9e0cd3280 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -16,8 +16,6 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; -use cpupool::CpuPool; - use chain::{self, ChainAdapter}; use core::core::{self, Output}; use core::core::block::BlockHeader; @@ -38,14 +36,9 @@ pub struct NetToChainAdapter { p2p_server: OneTime>, tx_pool: Arc>>, syncing: AtomicBool, - cpu_pool: CpuPool, } impl NetAdapter for NetToChainAdapter { - fn cpu_pool(&self) -> CpuPool { - self.cpu_pool.clone() - } - fn total_difficulty(&self) -> Difficulty { self.chain.total_difficulty() } @@ -282,7 +275,6 @@ impl NetToChainAdapter { p2p_server: OneTime::new(), tx_pool: tx_pool, syncing: AtomicBool::new(true), - cpu_pool: CpuPool::new(1), } } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index f34bdffce..eca81a2ae 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -8,7 +8,6 @@ workspace = ".." bitflags = "^0.7.0" byteorder = "^0.5" futures = "^0.1.15" -futures-cpupool = "^0.1.3" slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] } net2 = "0.2.0" rand = "^0.3" diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 9b7e5c7dd..f41c71dbc 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -21,9 +21,9 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use futures; -use futures::{Future, Stream, stream}; +use futures::{Future, Stream}; +use futures::stream; use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; -use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::{read_exact, write_all}; @@ -94,7 +94,6 @@ impl Connection { /// itself. pub fn listen( conn: TcpStream, - pool: CpuPool, handler: F, ) -> (Connection, Box>) where @@ -125,10 +124,10 @@ impl Connection { }; // setup the reading future, getting messages from the peer and processing them - let read_msg = me.read_msg(tx, reader, handler, pool).map(|_| ()); + let read_msg = me.read_msg(tx, reader, handler).map(|_| ()); - // setting the writing future - // getting messages from our system and sending them out + // setting the writing future, getting messages from our system and sending + // them out let write_msg = me.write_msg(rx, writer).map(|_| ()); // select between our different futures and return them @@ -155,20 +154,16 @@ impl Connection { let sent_bytes = self.sent_bytes.clone(); let send_data = rx .map_err(|_| Error::ConnectionClose) - .map(move |data| { - trace!(LOGGER, "write_msg: start"); - // add the count of bytes sent + .map(move |data| { + // add the count of bytes sent let mut sent_bytes = sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; data }) - // write the data and make sure the future returns the right types + // write the data and make sure the future returns the right types .fold(writer, |writer, data| { - write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| { - trace!(LOGGER, "write_msg: done"); - writer - }) - }); + write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| writer) + }); Box::new(send_data) } @@ -179,37 +174,28 @@ impl Connection { sender: UnboundedSender>, reader: R, handler: F, - pool: CpuPool, ) -> Box> where F: Handler + 'static, - R: AsyncRead + Send + 'static, + R: AsyncRead + 'static, { // infinite iterator stream so we repeat the message reading logic until the - // peer is stopped + // peer is stopped let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>)); // setup the reading future, getting messages from the peer and processing them let recv_bytes = self.received_bytes.clone(); let handler = Arc::new(handler); - let mut count = 0; - - let read_msg = iter.buffered(1).fold(reader, move |reader, _| { - count += 1; - trace!(LOGGER, "read_msg: count (per buffered fold): {}", count); - + let read_msg = iter.fold(reader, move |reader, _| { let recv_bytes = recv_bytes.clone(); let handler = handler.clone(); let sender_inner = sender.clone(); - let pool = pool.clone(); // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) .from_err() .and_then(move |(reader, buf)| { - trace!(LOGGER, "read_msg: start"); - let header = try!(ser::deserialize::(&mut &buf[..])); Ok((reader, header)) }) @@ -224,16 +210,14 @@ impl Connection { let mut recv_bytes = recv_bytes.lock().unwrap(); *recv_bytes += header.serialized_len() + header.msg_len; - pool.spawn_fn(move || { - let msg_type = header.msg_type; - if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { - debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e); - return Err(Error::Serialization(e)); - } + // and handle the different message types + let msg_type = header.msg_type; + if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { + debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e); + return Err(Error::Serialization(e)); + } - trace!(LOGGER, "read_msg: done (via cpu_pool)"); - Ok(reader) - }) + Ok(reader) }) }); Box::new(read_msg) @@ -276,7 +260,6 @@ impl TimeoutConnection { /// Same as Connection pub fn listen( conn: TcpStream, - pool: CpuPool, handler: F, ) -> (TimeoutConnection, Box>) where @@ -287,7 +270,7 @@ impl TimeoutConnection { // Decorates the handler to remove the "subscription" from the expected // responses. We got our replies, so no timeout should occur. let exp = expects.clone(); - let (conn, fut) = Connection::listen(conn, pool, move |sender, header: MsgHeader, data| { + let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| { let msg_type = header.msg_type; let recv_h = try!(handler.handle(sender, header, data)); diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 01980e1ed..cf23fc837 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -26,8 +26,6 @@ extern crate bytes; #[macro_use] extern crate enum_primitive; extern crate futures; -extern crate futures_cpupool; - #[macro_use] extern crate grin_core as core; extern crate grin_store; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index aa0981adc..9ad4b062c 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -16,7 +16,6 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use futures::Future; -use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use core::core; @@ -222,10 +221,6 @@ impl TrackingAdapter { } impl NetAdapter for TrackingAdapter { - fn cpu_pool(&self) -> CpuPool { - self.adapter.cpu_pool() - } - fn total_difficulty(&self) -> Difficulty { self.adapter.total_difficulty() } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index cb5578104..9dcad8c14 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -50,8 +50,7 @@ impl Protocol for ProtocolV1 { adapter: Arc, addr: SocketAddr, ) -> Box> { - let pool = adapter.cpu_pool(); - let (conn, listener) = TimeoutConnection::listen(conn, pool, move |sender, header, data| { + let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { let adapt = adapter.as_ref(); handle_payload(adapt, sender, header, data, addr) }); @@ -153,11 +152,7 @@ fn handle_payload( &MsgHeader::new(Type::Pong, body_data.len() as u64), )); data.append(&mut body_data); - - if let Err(e) = sender.unbounded_send(data) { - debug!(LOGGER, "handle_payload: Ping, error sending: {:?}", e); - } - + sender.unbounded_send(data).unwrap(); Ok(None) } Type::Pong => { @@ -185,18 +180,13 @@ fn handle_payload( &MsgHeader::new(Type::Block, body_data.len() as u64), )); data.append(&mut body_data); - if let Err(e) = sender.unbounded_send(data) { - debug!(LOGGER, "handle_payload: GetBlock, error sending: {:?}", e); - } + sender.unbounded_send(data).unwrap(); } Ok(None) } Type::Block => { let b = ser::deserialize::(&mut &buf[..])?; let bh = b.hash(); - - debug!(LOGGER, "handle_payload: Block {}", bh); - adapter.block_received(b, addr); Ok(Some(bh)) } @@ -217,9 +207,7 @@ fn handle_payload( &MsgHeader::new(Type::Headers, body_data.len() as u64), )); data.append(&mut body_data); - if let Err(e) = sender.unbounded_send(data) { - debug!(LOGGER, "handle_payload: GetHeaders, error sending: {:?}", e); - } + sender.unbounded_send(data).unwrap(); Ok(None) } @@ -246,9 +234,7 @@ fn handle_payload( &MsgHeader::new(Type::PeerAddrs, body_data.len() as u64), )); data.append(&mut body_data); - if let Err(e) = sender.unbounded_send(data) { - debug!(LOGGER, "handle_payload: GetPeerAddrs, error sending: {:?}", e); - } + sender.unbounded_send(data).unwrap(); Ok(None) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index ce7478c32..4327c139d 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -24,7 +24,6 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; use futures::future::{self, IntoFuture}; -use futures_cpupool::CpuPool; use rand::{thread_rng, Rng}; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; @@ -40,22 +39,8 @@ use types::*; use util::LOGGER; /// A no-op network adapter used for testing. -pub struct DummyAdapter { - cpu_pool: CpuPool, -} - -impl DummyAdapter { - pub fn new() -> DummyAdapter { - DummyAdapter { - cpu_pool: CpuPool::new(1), - } - } -} - +pub struct DummyAdapter {} impl NetAdapter for DummyAdapter { - fn cpu_pool(&self) -> CpuPool { - self.cpu_pool.clone() - } fn total_difficulty(&self) -> Difficulty { Difficulty::one() } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index e19e6580c..1cf6a0b59 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -18,7 +18,6 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use futures::Future; -use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use tokio_timer::TimerError; @@ -198,7 +197,4 @@ pub trait NetAdapter: Sync + Send { /// Heard total_difficulty from a connected peer (via ping/pong). fn peer_difficulty(&self, SocketAddr, Difficulty); - - /// Central threadpool that we can use to handle requests from all our peers. - fn cpu_pool(&self) -> CpuPool; } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index a3d780c6b..5756761cd 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -13,7 +13,6 @@ // limitations under the License. extern crate futures; -extern crate futures_cpupool; extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate tokio_core; @@ -37,7 +36,7 @@ fn peer_handshake() { let mut evtlp = Core::new().unwrap(); let handle = evtlp.handle(); let p2p_conf = p2p::P2PConfig::default(); - let net_adapter = Arc::new(p2p::DummyAdapter::new()); + let net_adapter = Arc::new(p2p::DummyAdapter {}); let server = p2p::Server::new( ".grin".to_owned(), p2p::UNKNOWN,