mirror of
https://github.com/mimblewimble/grin.git
synced 2025-01-21 03:21:08 +03:00
introduce cpu_pool to read_msg (#477)
* introduce cpu_pool to read_msg * cleanup and less unwrap() in handler * expose cpu_pool as part of net_adapter (reuse across all peer connections and handlers) * fix DummyAdaptor for test usage * add new() to DummyAdapter for convenience
This commit is contained in:
parent
d92b6ddddb
commit
45b9962109
9 changed files with 95 additions and 28 deletions
|
@ -16,6 +16,8 @@ use std::net::SocketAddr;
|
|||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use cpupool::CpuPool;
|
||||
|
||||
use chain::{self, ChainAdapter};
|
||||
use core::core::{self, Output};
|
||||
use core::core::block::BlockHeader;
|
||||
|
@ -36,9 +38,14 @@ pub struct NetToChainAdapter {
|
|||
p2p_server: OneTime<Arc<p2p::Server>>,
|
||||
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
|
||||
syncing: AtomicBool,
|
||||
cpu_pool: CpuPool,
|
||||
}
|
||||
|
||||
impl NetAdapter for NetToChainAdapter {
|
||||
fn cpu_pool(&self) -> CpuPool {
|
||||
self.cpu_pool.clone()
|
||||
}
|
||||
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
self.chain.total_difficulty()
|
||||
}
|
||||
|
@ -275,6 +282,7 @@ impl NetToChainAdapter {
|
|||
p2p_server: OneTime::new(),
|
||||
tx_pool: tx_pool,
|
||||
syncing: AtomicBool::new(true),
|
||||
cpu_pool: CpuPool::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ workspace = ".."
|
|||
bitflags = "^0.7.0"
|
||||
byteorder = "^0.5"
|
||||
futures = "^0.1.15"
|
||||
futures-cpupool = "^0.1.3"
|
||||
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
|
||||
net2 = "0.2.0"
|
||||
rand = "^0.3"
|
||||
|
|
|
@ -21,9 +21,9 @@ use std::sync::{Arc, Mutex};
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures;
|
||||
use futures::{Future, Stream};
|
||||
use futures::stream;
|
||||
use futures::{Future, Stream, stream};
|
||||
use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::io::{read_exact, write_all};
|
||||
|
@ -94,6 +94,7 @@ impl Connection {
|
|||
/// itself.
|
||||
pub fn listen<F>(
|
||||
conn: TcpStream,
|
||||
pool: CpuPool,
|
||||
handler: F,
|
||||
) -> (Connection, Box<Future<Item = (), Error = Error>>)
|
||||
where
|
||||
|
@ -124,10 +125,10 @@ impl Connection {
|
|||
};
|
||||
|
||||
// setup the reading future, getting messages from the peer and processing them
|
||||
let read_msg = me.read_msg(tx, reader, handler).map(|_| ());
|
||||
let read_msg = me.read_msg(tx, reader, handler, pool).map(|_| ());
|
||||
|
||||
// setting the writing future, getting messages from our system and sending
|
||||
// them out
|
||||
// setting the writing future
|
||||
// getting messages from our system and sending them out
|
||||
let write_msg = me.write_msg(rx, writer).map(|_| ());
|
||||
|
||||
// select between our different futures and return them
|
||||
|
@ -154,16 +155,20 @@ impl Connection {
|
|||
let sent_bytes = self.sent_bytes.clone();
|
||||
let send_data = rx
|
||||
.map_err(|_| Error::ConnectionClose)
|
||||
.map(move |data| {
|
||||
// add the count of bytes sent
|
||||
.map(move |data| {
|
||||
trace!(LOGGER, "write_msg: start");
|
||||
// add the count of bytes sent
|
||||
let mut sent_bytes = sent_bytes.lock().unwrap();
|
||||
*sent_bytes += data.len() as u64;
|
||||
data
|
||||
})
|
||||
// write the data and make sure the future returns the right types
|
||||
// write the data and make sure the future returns the right types
|
||||
.fold(writer, |writer, data| {
|
||||
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| writer)
|
||||
});
|
||||
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| {
|
||||
trace!(LOGGER, "write_msg: done");
|
||||
writer
|
||||
})
|
||||
});
|
||||
Box::new(send_data)
|
||||
}
|
||||
|
||||
|
@ -174,28 +179,37 @@ impl Connection {
|
|||
sender: UnboundedSender<Vec<u8>>,
|
||||
reader: R,
|
||||
handler: F,
|
||||
pool: CpuPool,
|
||||
) -> Box<Future<Item = R, Error = Error>>
|
||||
where
|
||||
F: Handler + 'static,
|
||||
R: AsyncRead + 'static,
|
||||
R: AsyncRead + Send + 'static,
|
||||
{
|
||||
// infinite iterator stream so we repeat the message reading logic until the
|
||||
// peer is stopped
|
||||
// peer is stopped
|
||||
let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>));
|
||||
|
||||
// setup the reading future, getting messages from the peer and processing them
|
||||
let recv_bytes = self.received_bytes.clone();
|
||||
let handler = Arc::new(handler);
|
||||
|
||||
let read_msg = iter.fold(reader, move |reader, _| {
|
||||
let mut count = 0;
|
||||
|
||||
let read_msg = iter.buffered(1).fold(reader, move |reader, _| {
|
||||
count += 1;
|
||||
trace!(LOGGER, "read_msg: count (per buffered fold): {}", count);
|
||||
|
||||
let recv_bytes = recv_bytes.clone();
|
||||
let handler = handler.clone();
|
||||
let sender_inner = sender.clone();
|
||||
let pool = pool.clone();
|
||||
|
||||
// first read the message header
|
||||
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
||||
.from_err()
|
||||
.and_then(move |(reader, buf)| {
|
||||
trace!(LOGGER, "read_msg: start");
|
||||
|
||||
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
||||
Ok((reader, header))
|
||||
})
|
||||
|
@ -210,14 +224,16 @@ impl Connection {
|
|||
let mut recv_bytes = recv_bytes.lock().unwrap();
|
||||
*recv_bytes += header.serialized_len() + header.msg_len;
|
||||
|
||||
// and handle the different message types
|
||||
let msg_type = header.msg_type;
|
||||
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
|
||||
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
|
||||
return Err(Error::Serialization(e));
|
||||
}
|
||||
pool.spawn_fn(move || {
|
||||
let msg_type = header.msg_type;
|
||||
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
|
||||
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
|
||||
return Err(Error::Serialization(e));
|
||||
}
|
||||
|
||||
Ok(reader)
|
||||
trace!(LOGGER, "read_msg: done (via cpu_pool)");
|
||||
Ok(reader)
|
||||
})
|
||||
})
|
||||
});
|
||||
Box::new(read_msg)
|
||||
|
@ -260,6 +276,7 @@ impl TimeoutConnection {
|
|||
/// Same as Connection
|
||||
pub fn listen<F>(
|
||||
conn: TcpStream,
|
||||
pool: CpuPool,
|
||||
handler: F,
|
||||
) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
|
||||
where
|
||||
|
@ -270,7 +287,7 @@ impl TimeoutConnection {
|
|||
// Decorates the handler to remove the "subscription" from the expected
|
||||
// responses. We got our replies, so no timeout should occur.
|
||||
let exp = expects.clone();
|
||||
let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| {
|
||||
let (conn, fut) = Connection::listen(conn, pool, move |sender, header: MsgHeader, data| {
|
||||
let msg_type = header.msg_type;
|
||||
let recv_h = try!(handler.handle(sender, header, data));
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ extern crate bytes;
|
|||
#[macro_use]
|
||||
extern crate enum_primitive;
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
|
||||
#[macro_use]
|
||||
extern crate grin_core as core;
|
||||
extern crate grin_store;
|
||||
|
|
|
@ -16,6 +16,7 @@ use std::net::SocketAddr;
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use futures::Future;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::net::TcpStream;
|
||||
|
||||
use core::core;
|
||||
|
@ -221,6 +222,10 @@ impl TrackingAdapter {
|
|||
}
|
||||
|
||||
impl NetAdapter for TrackingAdapter {
|
||||
fn cpu_pool(&self) -> CpuPool {
|
||||
self.adapter.cpu_pool()
|
||||
}
|
||||
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
self.adapter.total_difficulty()
|
||||
}
|
||||
|
|
|
@ -50,7 +50,8 @@ impl Protocol for ProtocolV1 {
|
|||
adapter: Arc<NetAdapter>,
|
||||
addr: SocketAddr,
|
||||
) -> Box<Future<Item = (), Error = Error>> {
|
||||
let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
|
||||
let pool = adapter.cpu_pool();
|
||||
let (conn, listener) = TimeoutConnection::listen(conn, pool, move |sender, header, data| {
|
||||
let adapt = adapter.as_ref();
|
||||
handle_payload(adapt, sender, header, data, addr)
|
||||
});
|
||||
|
@ -152,7 +153,11 @@ fn handle_payload(
|
|||
&MsgHeader::new(Type::Pong, body_data.len() as u64),
|
||||
));
|
||||
data.append(&mut body_data);
|
||||
sender.unbounded_send(data).unwrap();
|
||||
|
||||
if let Err(e) = sender.unbounded_send(data) {
|
||||
debug!(LOGGER, "handle_payload: Ping, error sending: {:?}", e);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
Type::Pong => {
|
||||
|
@ -180,13 +185,18 @@ fn handle_payload(
|
|||
&MsgHeader::new(Type::Block, body_data.len() as u64),
|
||||
));
|
||||
data.append(&mut body_data);
|
||||
sender.unbounded_send(data).unwrap();
|
||||
if let Err(e) = sender.unbounded_send(data) {
|
||||
debug!(LOGGER, "handle_payload: GetBlock, error sending: {:?}", e);
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
Type::Block => {
|
||||
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
|
||||
let bh = b.hash();
|
||||
|
||||
debug!(LOGGER, "handle_payload: Block {}", bh);
|
||||
|
||||
adapter.block_received(b, addr);
|
||||
Ok(Some(bh))
|
||||
}
|
||||
|
@ -207,7 +217,9 @@ fn handle_payload(
|
|||
&MsgHeader::new(Type::Headers, body_data.len() as u64),
|
||||
));
|
||||
data.append(&mut body_data);
|
||||
sender.unbounded_send(data).unwrap();
|
||||
if let Err(e) = sender.unbounded_send(data) {
|
||||
debug!(LOGGER, "handle_payload: GetHeaders, error sending: {:?}", e);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
@ -234,7 +246,9 @@ fn handle_payload(
|
|||
&MsgHeader::new(Type::PeerAddrs, body_data.len() as u64),
|
||||
));
|
||||
data.append(&mut body_data);
|
||||
sender.unbounded_send(data).unwrap();
|
||||
if let Err(e) = sender.unbounded_send(data) {
|
||||
debug!(LOGGER, "handle_payload: GetPeerAddrs, error sending: {:?}", e);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ use std::time::Duration;
|
|||
use futures;
|
||||
use futures::{Future, Stream};
|
||||
use futures::future::{self, IntoFuture};
|
||||
use futures_cpupool::CpuPool;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio_core::net::{TcpListener, TcpStream};
|
||||
use tokio_core::reactor;
|
||||
|
@ -39,8 +40,22 @@ use types::*;
|
|||
use util::LOGGER;
|
||||
|
||||
/// A no-op network adapter used for testing.
|
||||
pub struct DummyAdapter {}
|
||||
pub struct DummyAdapter {
|
||||
cpu_pool: CpuPool,
|
||||
}
|
||||
|
||||
impl DummyAdapter {
|
||||
pub fn new() -> DummyAdapter {
|
||||
DummyAdapter {
|
||||
cpu_pool: CpuPool::new(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetAdapter for DummyAdapter {
|
||||
fn cpu_pool(&self) -> CpuPool {
|
||||
self.cpu_pool.clone()
|
||||
}
|
||||
fn total_difficulty(&self) -> Difficulty {
|
||||
Difficulty::one()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ use std::net::{IpAddr, SocketAddr};
|
|||
use std::sync::Arc;
|
||||
|
||||
use futures::Future;
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_timer::TimerError;
|
||||
|
||||
|
@ -197,4 +198,7 @@ pub trait NetAdapter: Sync + Send {
|
|||
|
||||
/// Heard total_difficulty from a connected peer (via ping/pong).
|
||||
fn peer_difficulty(&self, SocketAddr, Difficulty);
|
||||
|
||||
/// Central threadpool that we can use to handle requests from all our peers.
|
||||
fn cpu_pool(&self) -> CpuPool;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate grin_core as core;
|
||||
extern crate grin_p2p as p2p;
|
||||
extern crate tokio_core;
|
||||
|
@ -36,7 +37,7 @@ fn peer_handshake() {
|
|||
let mut evtlp = Core::new().unwrap();
|
||||
let handle = evtlp.handle();
|
||||
let p2p_conf = p2p::P2PConfig::default();
|
||||
let net_adapter = Arc::new(p2p::DummyAdapter {});
|
||||
let net_adapter = Arc::new(p2p::DummyAdapter::new());
|
||||
let server = p2p::Server::new(
|
||||
".grin".to_owned(),
|
||||
p2p::UNKNOWN,
|
||||
|
|
Loading…
Reference in a new issue