Revert "introduce cpu_pool to read_msg (#477)" (#478)

This reverts commit 45b9962109.
This commit is contained in:
AntiochP 2017-12-13 16:30:59 -05:00 committed by GitHub
parent 45b9962109
commit bffd955c26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 28 additions and 95 deletions

View file

@ -16,8 +16,6 @@ use std::net::SocketAddr;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use cpupool::CpuPool;
use chain::{self, ChainAdapter}; use chain::{self, ChainAdapter};
use core::core::{self, Output}; use core::core::{self, Output};
use core::core::block::BlockHeader; use core::core::block::BlockHeader;
@ -38,14 +36,9 @@ pub struct NetToChainAdapter {
p2p_server: OneTime<Arc<p2p::Server>>, p2p_server: OneTime<Arc<p2p::Server>>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>, tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
syncing: AtomicBool, syncing: AtomicBool,
cpu_pool: CpuPool,
} }
impl NetAdapter for NetToChainAdapter { impl NetAdapter for NetToChainAdapter {
fn cpu_pool(&self) -> CpuPool {
self.cpu_pool.clone()
}
fn total_difficulty(&self) -> Difficulty { fn total_difficulty(&self) -> Difficulty {
self.chain.total_difficulty() self.chain.total_difficulty()
} }
@ -282,7 +275,6 @@ impl NetToChainAdapter {
p2p_server: OneTime::new(), p2p_server: OneTime::new(),
tx_pool: tx_pool, tx_pool: tx_pool,
syncing: AtomicBool::new(true), syncing: AtomicBool::new(true),
cpu_pool: CpuPool::new(1),
} }
} }

View file

@ -8,7 +8,6 @@ workspace = ".."
bitflags = "^0.7.0" bitflags = "^0.7.0"
byteorder = "^0.5" byteorder = "^0.5"
futures = "^0.1.15" futures = "^0.1.15"
futures-cpupool = "^0.1.3"
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] } slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
net2 = "0.2.0" net2 = "0.2.0"
rand = "^0.3" rand = "^0.3"

View file

@ -21,9 +21,9 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures; use futures;
use futures::{Future, Stream, stream}; use futures::{Future, Stream};
use futures::stream;
use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all}; use tokio_io::io::{read_exact, write_all};
@ -94,7 +94,6 @@ impl Connection {
/// itself. /// itself.
pub fn listen<F>( pub fn listen<F>(
conn: TcpStream, conn: TcpStream,
pool: CpuPool,
handler: F, handler: F,
) -> (Connection, Box<Future<Item = (), Error = Error>>) ) -> (Connection, Box<Future<Item = (), Error = Error>>)
where where
@ -125,10 +124,10 @@ impl Connection {
}; };
// setup the reading future, getting messages from the peer and processing them // setup the reading future, getting messages from the peer and processing them
let read_msg = me.read_msg(tx, reader, handler, pool).map(|_| ()); let read_msg = me.read_msg(tx, reader, handler).map(|_| ());
// setting the writing future // setting the writing future, getting messages from our system and sending
// getting messages from our system and sending them out // them out
let write_msg = me.write_msg(rx, writer).map(|_| ()); let write_msg = me.write_msg(rx, writer).map(|_| ());
// select between our different futures and return them // select between our different futures and return them
@ -155,20 +154,16 @@ impl Connection {
let sent_bytes = self.sent_bytes.clone(); let sent_bytes = self.sent_bytes.clone();
let send_data = rx let send_data = rx
.map_err(|_| Error::ConnectionClose) .map_err(|_| Error::ConnectionClose)
.map(move |data| { .map(move |data| {
trace!(LOGGER, "write_msg: start"); // add the count of bytes sent
// add the count of bytes sent
let mut sent_bytes = sent_bytes.lock().unwrap(); let mut sent_bytes = sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64; *sent_bytes += data.len() as u64;
data data
}) })
// write the data and make sure the future returns the right types // write the data and make sure the future returns the right types
.fold(writer, |writer, data| { .fold(writer, |writer, data| {
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| { write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| writer)
trace!(LOGGER, "write_msg: done"); });
writer
})
});
Box::new(send_data) Box::new(send_data)
} }
@ -179,37 +174,28 @@ impl Connection {
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
reader: R, reader: R,
handler: F, handler: F,
pool: CpuPool,
) -> Box<Future<Item = R, Error = Error>> ) -> Box<Future<Item = R, Error = Error>>
where where
F: Handler + 'static, F: Handler + 'static,
R: AsyncRead + Send + 'static, R: AsyncRead + 'static,
{ {
// infinite iterator stream so we repeat the message reading logic until the // infinite iterator stream so we repeat the message reading logic until the
// peer is stopped // peer is stopped
let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>)); let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>));
// setup the reading future, getting messages from the peer and processing them // setup the reading future, getting messages from the peer and processing them
let recv_bytes = self.received_bytes.clone(); let recv_bytes = self.received_bytes.clone();
let handler = Arc::new(handler); let handler = Arc::new(handler);
let mut count = 0; let read_msg = iter.fold(reader, move |reader, _| {
let read_msg = iter.buffered(1).fold(reader, move |reader, _| {
count += 1;
trace!(LOGGER, "read_msg: count (per buffered fold): {}", count);
let recv_bytes = recv_bytes.clone(); let recv_bytes = recv_bytes.clone();
let handler = handler.clone(); let handler = handler.clone();
let sender_inner = sender.clone(); let sender_inner = sender.clone();
let pool = pool.clone();
// first read the message header // first read the message header
read_exact(reader, vec![0u8; HEADER_LEN as usize]) read_exact(reader, vec![0u8; HEADER_LEN as usize])
.from_err() .from_err()
.and_then(move |(reader, buf)| { .and_then(move |(reader, buf)| {
trace!(LOGGER, "read_msg: start");
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..])); let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
Ok((reader, header)) Ok((reader, header))
}) })
@ -224,16 +210,14 @@ impl Connection {
let mut recv_bytes = recv_bytes.lock().unwrap(); let mut recv_bytes = recv_bytes.lock().unwrap();
*recv_bytes += header.serialized_len() + header.msg_len; *recv_bytes += header.serialized_len() + header.msg_len;
pool.spawn_fn(move || { // and handle the different message types
let msg_type = header.msg_type; let msg_type = header.msg_type;
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e); debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
return Err(Error::Serialization(e)); return Err(Error::Serialization(e));
} }
trace!(LOGGER, "read_msg: done (via cpu_pool)"); Ok(reader)
Ok(reader)
})
}) })
}); });
Box::new(read_msg) Box::new(read_msg)
@ -276,7 +260,6 @@ impl TimeoutConnection {
/// Same as Connection /// Same as Connection
pub fn listen<F>( pub fn listen<F>(
conn: TcpStream, conn: TcpStream,
pool: CpuPool,
handler: F, handler: F,
) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>) ) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
where where
@ -287,7 +270,7 @@ impl TimeoutConnection {
// Decorates the handler to remove the "subscription" from the expected // Decorates the handler to remove the "subscription" from the expected
// responses. We got our replies, so no timeout should occur. // responses. We got our replies, so no timeout should occur.
let exp = expects.clone(); let exp = expects.clone();
let (conn, fut) = Connection::listen(conn, pool, move |sender, header: MsgHeader, data| { let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| {
let msg_type = header.msg_type; let msg_type = header.msg_type;
let recv_h = try!(handler.handle(sender, header, data)); let recv_h = try!(handler.handle(sender, header, data));

View file

@ -26,8 +26,6 @@ extern crate bytes;
#[macro_use] #[macro_use]
extern crate enum_primitive; extern crate enum_primitive;
extern crate futures; extern crate futures;
extern crate futures_cpupool;
#[macro_use] #[macro_use]
extern crate grin_core as core; extern crate grin_core as core;
extern crate grin_store; extern crate grin_store;

View file

@ -16,7 +16,6 @@ use std::net::SocketAddr;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use futures::Future; use futures::Future;
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use core::core; use core::core;
@ -222,10 +221,6 @@ impl TrackingAdapter {
} }
impl NetAdapter for TrackingAdapter { impl NetAdapter for TrackingAdapter {
fn cpu_pool(&self) -> CpuPool {
self.adapter.cpu_pool()
}
fn total_difficulty(&self) -> Difficulty { fn total_difficulty(&self) -> Difficulty {
self.adapter.total_difficulty() self.adapter.total_difficulty()
} }

View file

@ -50,8 +50,7 @@ impl Protocol for ProtocolV1 {
adapter: Arc<NetAdapter>, adapter: Arc<NetAdapter>,
addr: SocketAddr, addr: SocketAddr,
) -> Box<Future<Item = (), Error = Error>> { ) -> Box<Future<Item = (), Error = Error>> {
let pool = adapter.cpu_pool(); let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
let (conn, listener) = TimeoutConnection::listen(conn, pool, move |sender, header, data| {
let adapt = adapter.as_ref(); let adapt = adapter.as_ref();
handle_payload(adapt, sender, header, data, addr) handle_payload(adapt, sender, header, data, addr)
}); });
@ -153,11 +152,7 @@ fn handle_payload(
&MsgHeader::new(Type::Pong, body_data.len() as u64), &MsgHeader::new(Type::Pong, body_data.len() as u64),
)); ));
data.append(&mut body_data); data.append(&mut body_data);
sender.unbounded_send(data).unwrap();
if let Err(e) = sender.unbounded_send(data) {
debug!(LOGGER, "handle_payload: Ping, error sending: {:?}", e);
}
Ok(None) Ok(None)
} }
Type::Pong => { Type::Pong => {
@ -185,18 +180,13 @@ fn handle_payload(
&MsgHeader::new(Type::Block, body_data.len() as u64), &MsgHeader::new(Type::Block, body_data.len() as u64),
)); ));
data.append(&mut body_data); data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) { sender.unbounded_send(data).unwrap();
debug!(LOGGER, "handle_payload: GetBlock, error sending: {:?}", e);
}
} }
Ok(None) Ok(None)
} }
Type::Block => { Type::Block => {
let b = ser::deserialize::<core::Block>(&mut &buf[..])?; let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
let bh = b.hash(); let bh = b.hash();
debug!(LOGGER, "handle_payload: Block {}", bh);
adapter.block_received(b, addr); adapter.block_received(b, addr);
Ok(Some(bh)) Ok(Some(bh))
} }
@ -217,9 +207,7 @@ fn handle_payload(
&MsgHeader::new(Type::Headers, body_data.len() as u64), &MsgHeader::new(Type::Headers, body_data.len() as u64),
)); ));
data.append(&mut body_data); data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) { sender.unbounded_send(data).unwrap();
debug!(LOGGER, "handle_payload: GetHeaders, error sending: {:?}", e);
}
Ok(None) Ok(None)
} }
@ -246,9 +234,7 @@ fn handle_payload(
&MsgHeader::new(Type::PeerAddrs, body_data.len() as u64), &MsgHeader::new(Type::PeerAddrs, body_data.len() as u64),
)); ));
data.append(&mut body_data); data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) { sender.unbounded_send(data).unwrap();
debug!(LOGGER, "handle_payload: GetPeerAddrs, error sending: {:?}", e);
}
Ok(None) Ok(None)
} }

View file

@ -24,7 +24,6 @@ use std::time::Duration;
use futures; use futures;
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::future::{self, IntoFuture}; use futures::future::{self, IntoFuture};
use futures_cpupool::CpuPool;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor; use tokio_core::reactor;
@ -40,22 +39,8 @@ use types::*;
use util::LOGGER; use util::LOGGER;
/// A no-op network adapter used for testing. /// A no-op network adapter used for testing.
pub struct DummyAdapter { pub struct DummyAdapter {}
cpu_pool: CpuPool,
}
impl DummyAdapter {
pub fn new() -> DummyAdapter {
DummyAdapter {
cpu_pool: CpuPool::new(1),
}
}
}
impl NetAdapter for DummyAdapter { impl NetAdapter for DummyAdapter {
fn cpu_pool(&self) -> CpuPool {
self.cpu_pool.clone()
}
fn total_difficulty(&self) -> Difficulty { fn total_difficulty(&self) -> Difficulty {
Difficulty::one() Difficulty::one()
} }

View file

@ -18,7 +18,6 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use futures::Future; use futures::Future;
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use tokio_timer::TimerError; use tokio_timer::TimerError;
@ -198,7 +197,4 @@ pub trait NetAdapter: Sync + Send {
/// Heard total_difficulty from a connected peer (via ping/pong). /// Heard total_difficulty from a connected peer (via ping/pong).
fn peer_difficulty(&self, SocketAddr, Difficulty); fn peer_difficulty(&self, SocketAddr, Difficulty);
/// Central threadpool that we can use to handle requests from all our peers.
fn cpu_pool(&self) -> CpuPool;
} }

View file

@ -13,7 +13,6 @@
// limitations under the License. // limitations under the License.
extern crate futures; extern crate futures;
extern crate futures_cpupool;
extern crate grin_core as core; extern crate grin_core as core;
extern crate grin_p2p as p2p; extern crate grin_p2p as p2p;
extern crate tokio_core; extern crate tokio_core;
@ -37,7 +36,7 @@ fn peer_handshake() {
let mut evtlp = Core::new().unwrap(); let mut evtlp = Core::new().unwrap();
let handle = evtlp.handle(); let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default(); let p2p_conf = p2p::P2PConfig::default();
let net_adapter = Arc::new(p2p::DummyAdapter::new()); let net_adapter = Arc::new(p2p::DummyAdapter {});
let server = p2p::Server::new( let server = p2p::Server::new(
".grin".to_owned(), ".grin".to_owned(),
p2p::UNKNOWN, p2p::UNKNOWN,