diff --git a/grin/src/server.rs b/grin/src/server.rs index 48baa35b8..2021a2e6b 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -23,6 +23,7 @@ use std::thread; use std::time; use futures::{Future, Stream}; +use cpupool::CpuPool; use tokio_core::reactor; use tokio_timer::Timer; @@ -117,12 +118,17 @@ impl Server { tx_pool.clone(), )); + // thread pool (single thread) for offloading handler.handle() + // work from the main run loop in p2p_server + let cpu_pool = CpuPool::new(1); + let p2p_server = Arc::new(p2p::Server::new( config.db_root.clone(), config.capabilities, config.p2p_config.unwrap(), net_adapter.clone(), genesis.hash(), + cpu_pool.clone(), )?); chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index eca81a2ae..f34bdffce 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -8,6 +8,7 @@ workspace = ".." bitflags = "^0.7.0" byteorder = "^0.5" futures = "^0.1.15" +futures-cpupool = "^0.1.3" slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] } net2 = "0.2.0" rand = "^0.3" diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index f41c71dbc..9b7e5c7dd 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -21,9 +21,9 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use futures; -use futures::{Future, Stream}; -use futures::stream; +use futures::{Future, Stream, stream}; use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; +use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::{read_exact, write_all}; @@ -94,6 +94,7 @@ impl Connection { /// itself. pub fn listen( conn: TcpStream, + pool: CpuPool, handler: F, ) -> (Connection, Box>) where @@ -124,10 +125,10 @@ impl Connection { }; // setup the reading future, getting messages from the peer and processing them - let read_msg = me.read_msg(tx, reader, handler).map(|_| ()); + let read_msg = me.read_msg(tx, reader, handler, pool).map(|_| ()); - // setting the writing future, getting messages from our system and sending - // them out + // setting the writing future + // getting messages from our system and sending them out let write_msg = me.write_msg(rx, writer).map(|_| ()); // select between our different futures and return them @@ -154,16 +155,20 @@ impl Connection { let sent_bytes = self.sent_bytes.clone(); let send_data = rx .map_err(|_| Error::ConnectionClose) - .map(move |data| { - // add the count of bytes sent + .map(move |data| { + trace!(LOGGER, "write_msg: start"); + // add the count of bytes sent let mut sent_bytes = sent_bytes.lock().unwrap(); *sent_bytes += data.len() as u64; data }) - // write the data and make sure the future returns the right types + // write the data and make sure the future returns the right types .fold(writer, |writer, data| { - write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| writer) - }); + write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| { + trace!(LOGGER, "write_msg: done"); + writer + }) + }); Box::new(send_data) } @@ -174,28 +179,37 @@ impl Connection { sender: UnboundedSender>, reader: R, handler: F, + pool: CpuPool, ) -> Box> where F: Handler + 'static, - R: AsyncRead + 'static, + R: AsyncRead + Send + 'static, { // infinite iterator stream so we repeat the message reading logic until the - // peer is stopped + // peer is stopped let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>)); // setup the reading future, getting messages from the peer and processing them let recv_bytes = self.received_bytes.clone(); let handler = Arc::new(handler); - let read_msg = iter.fold(reader, move |reader, _| { + let mut count = 0; + + let read_msg = iter.buffered(1).fold(reader, move |reader, _| { + count += 1; + trace!(LOGGER, "read_msg: count (per buffered fold): {}", count); + let recv_bytes = recv_bytes.clone(); let handler = handler.clone(); let sender_inner = sender.clone(); + let pool = pool.clone(); // first read the message header read_exact(reader, vec![0u8; HEADER_LEN as usize]) .from_err() .and_then(move |(reader, buf)| { + trace!(LOGGER, "read_msg: start"); + let header = try!(ser::deserialize::(&mut &buf[..])); Ok((reader, header)) }) @@ -210,14 +224,16 @@ impl Connection { let mut recv_bytes = recv_bytes.lock().unwrap(); *recv_bytes += header.serialized_len() + header.msg_len; - // and handle the different message types - let msg_type = header.msg_type; - if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { - debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e); - return Err(Error::Serialization(e)); - } + pool.spawn_fn(move || { + let msg_type = header.msg_type; + if let Err(e) = handler.handle(sender_inner.clone(), header, buf) { + debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e); + return Err(Error::Serialization(e)); + } - Ok(reader) + trace!(LOGGER, "read_msg: done (via cpu_pool)"); + Ok(reader) + }) }) }); Box::new(read_msg) @@ -260,6 +276,7 @@ impl TimeoutConnection { /// Same as Connection pub fn listen( conn: TcpStream, + pool: CpuPool, handler: F, ) -> (TimeoutConnection, Box>) where @@ -270,7 +287,7 @@ impl TimeoutConnection { // Decorates the handler to remove the "subscription" from the expected // responses. We got our replies, so no timeout should occur. let exp = expects.clone(); - let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| { + let (conn, fut) = Connection::listen(conn, pool, move |sender, header: MsgHeader, data| { let msg_type = header.msg_type; let recv_h = try!(handler.handle(sender, header, data)); diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 09739187f..8bb95bd4c 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -26,6 +26,8 @@ extern crate bytes; #[macro_use] extern crate enum_primitive; extern crate futures; +extern crate futures_cpupool; + #[macro_use] extern crate grin_core as core; extern crate grin_store; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 880579d43..91e389d76 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -16,6 +16,7 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use futures::Future; +use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use core::core; @@ -89,12 +90,12 @@ impl Peer { /// Main peer loop listening for messages and forwarding to the rest of the /// system. - pub fn run(&self, conn: TcpStream) -> Box> { + pub fn run(&self, conn: TcpStream, pool: CpuPool) -> Box> { let addr = self.info.addr; let state = self.state.clone(); let adapter = Arc::new(self.tracking_adapter.clone()); - Box::new(self.proto.handle(conn, adapter, addr).then(move |res| { + Box::new(self.proto.handle(conn, adapter, addr, pool).then(move |res| { // handle disconnection, standard disconnections aren't considered an error let mut state = state.write().unwrap(); match res { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index cd2d7a72e..97a69fda8 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -17,6 +17,7 @@ use std::net::SocketAddr; use futures::Future; use futures::sync::mpsc::UnboundedSender; +use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use core::core; @@ -49,8 +50,9 @@ impl Protocol for ProtocolV1 { conn: TcpStream, adapter: Arc, addr: SocketAddr, + pool: CpuPool, ) -> Box> { - let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { + let (conn, listener) = TimeoutConnection::listen(conn, pool, move |sender, header, data| { let adapt = adapter.as_ref(); handle_payload(adapt, sender, header, data, addr) }); @@ -156,7 +158,11 @@ fn handle_payload( &MsgHeader::new(Type::Pong, body_data.len() as u64), )); data.append(&mut body_data); - sender.unbounded_send(data).unwrap(); + + if let Err(e) = sender.unbounded_send(data) { + debug!(LOGGER, "handle_payload: Ping, error sending: {:?}", e); + } + Ok(None) } Type::Pong => { @@ -184,13 +190,18 @@ fn handle_payload( &MsgHeader::new(Type::Block, body_data.len() as u64), )); data.append(&mut body_data); - sender.unbounded_send(data).unwrap(); + if let Err(e) = sender.unbounded_send(data) { + debug!(LOGGER, "handle_payload: GetBlock, error sending: {:?}", e); + } } Ok(None) } Type::Block => { let b = ser::deserialize::(&mut &buf[..])?; let bh = b.hash(); + + debug!(LOGGER, "handle_payload: Block {}", bh); + adapter.block_received(b, addr); Ok(Some(bh)) } @@ -211,7 +222,9 @@ fn handle_payload( &MsgHeader::new(Type::Headers, body_data.len() as u64), )); data.append(&mut body_data); - sender.unbounded_send(data).unwrap(); + if let Err(e) = sender.unbounded_send(data) { + debug!(LOGGER, "handle_payload: GetHeaders, error sending: {:?}", e); + } Ok(None) } @@ -238,7 +251,9 @@ fn handle_payload( &MsgHeader::new(Type::PeerAddrs, body_data.len() as u64), )); data.append(&mut body_data); - sender.unbounded_send(data).unwrap(); + if let Err(e) = sender.unbounded_send(data) { + debug!(LOGGER, "handle_payload: GetPeerAddrs, error sending: {:?}", e); + } Ok(None) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 59c1d570d..3b4e15a77 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -23,6 +23,7 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; use futures::future::{self, IntoFuture}; +use futures_cpupool::CpuPool; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; use tokio_timer::Timer; @@ -39,6 +40,7 @@ use util::LOGGER; /// A no-op network adapter used for testing. pub struct DummyAdapter {} + impl ChainAdapter for DummyAdapter { fn total_difficulty(&self) -> Difficulty { Difficulty::one() @@ -56,6 +58,7 @@ impl ChainAdapter for DummyAdapter { None } } + impl NetAdapter for DummyAdapter { fn find_peer_addrs(&self, _: Capabilities) -> Vec { vec![] @@ -71,6 +74,7 @@ pub struct Server { capabilities: Capabilities, handshake: Arc, pub peers: Peers, + pool: CpuPool, stop: RefCell>>, } @@ -86,12 +90,14 @@ impl Server { config: P2PConfig, adapter: Arc, genesis: Hash, + pool: CpuPool, ) -> Result { Ok(Server { config: config, capabilities: capab, handshake: Arc::new(Handshake::new(genesis)), peers: Peers::new(PeerStore::new(db_root)?, adapter), + pool: pool, stop: RefCell::new(None), }) } @@ -106,6 +112,7 @@ impl Server { let handshake = self.handshake.clone(); let peers = self.peers.clone(); let capab = self.capabilities.clone(); + let pool = self.pool.clone(); // main peer acceptance future handling handshake let hp = h.clone(); @@ -116,6 +123,7 @@ impl Server { let peers2 = peers.clone(); let handshake = handshake.clone(); let hp = hp.clone(); + let pool = pool.clone(); future::ok(conn).and_then(move |conn| { // Refuse banned peers connection @@ -149,7 +157,7 @@ impl Server { // run the main peer protocol timed_peer.and_then(move |(conn, peer)| { let peer = peer.read().unwrap(); - peer.run(conn) + peer.run(conn, pool) }) }) }); @@ -220,6 +228,8 @@ impl Server { let peers = self.peers.clone(); let handshake = self.handshake.clone(); let capab = self.capabilities.clone(); + let pool = self.pool.clone(); + let self_addr = SocketAddr::new(self.config.host, self.config.port); let timer = Timer::default(); @@ -251,7 +261,7 @@ impl Server { }) .and_then(move |(socket, peer)| { let peer_inner = peer.read().unwrap(); - h2.spawn(peer_inner.run(socket).map_err(|e| { + h2.spawn(peer_inner.run(socket, pool).map_err(|e| { error!(LOGGER, "Peer error: {:?}", e); () })); diff --git a/p2p/src/types.rs b/p2p/src/types.rs index d0c292104..2ebf92f54 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -18,6 +18,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use futures::Future; +use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use tokio_timer::TimerError; @@ -131,7 +132,7 @@ pub trait Protocol { /// be known already, usually passed during construction. Will typically /// block so needs to be called withing a coroutine. Should also be called /// only once. - fn handle(&self, conn: TcpStream, na: Arc, addr: SocketAddr) + fn handle(&self, conn: TcpStream, na: Arc, addr: SocketAddr, pool: CpuPool) -> Box>; /// Sends a ping message to the remote peer. diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index e872739a9..df1f93ab5 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -13,6 +13,7 @@ // limitations under the License. extern crate futures; +extern crate futures_cpupool; extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate tokio_core; @@ -22,6 +23,7 @@ use std::sync::Arc; use std::time; use futures::future::Future; +use futures_cpupool::CpuPool; use tokio_core::net::TcpStream; use tokio_core::reactor::{self, Core}; @@ -37,12 +39,14 @@ fn peer_handshake() { let handle = evtlp.handle(); let p2p_conf = p2p::P2PConfig::default(); let net_adapter = Arc::new(p2p::DummyAdapter {}); + let pool = CpuPool::new(1); let server = p2p::Server::new( ".grin".to_owned(), p2p::UNKNOWN, p2p_conf, net_adapter.clone(), Hash::from_vec(vec![]), + pool.clone(), ).unwrap(); let run_server = server.start(handle.clone()); let my_addr = "127.0.0.1:5000".parse().unwrap(); @@ -71,7 +75,7 @@ fn peer_handshake() { ) }) .and_then(move |(socket, peer)| { - rhandle.spawn(peer.run(socket).map_err(|e| { + rhandle.spawn(peer.run(socket, pool).map_err(|e| { panic!("Client run failed: {:?}", e); })); peer.send_ping(Difficulty::one(), 0).unwrap();