mirror of
https://github.com/mimblewimble/grin.git
synced 2025-02-01 17:01:09 +03:00
This commit is contained in:
parent
590cb8b8dd
commit
38741c91e2
9 changed files with 89 additions and 32 deletions
|
@ -23,6 +23,7 @@ use std::thread;
|
||||||
use std::time;
|
use std::time;
|
||||||
|
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
|
use cpupool::CpuPool;
|
||||||
use tokio_core::reactor;
|
use tokio_core::reactor;
|
||||||
use tokio_timer::Timer;
|
use tokio_timer::Timer;
|
||||||
|
|
||||||
|
@ -117,12 +118,17 @@ impl Server {
|
||||||
tx_pool.clone(),
|
tx_pool.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
|
// thread pool (single thread) for offloading handler.handle()
|
||||||
|
// work from the main run loop in p2p_server
|
||||||
|
let cpu_pool = CpuPool::new(1);
|
||||||
|
|
||||||
let p2p_server = Arc::new(p2p::Server::new(
|
let p2p_server = Arc::new(p2p::Server::new(
|
||||||
config.db_root.clone(),
|
config.db_root.clone(),
|
||||||
config.capabilities,
|
config.capabilities,
|
||||||
config.p2p_config.unwrap(),
|
config.p2p_config.unwrap(),
|
||||||
net_adapter.clone(),
|
net_adapter.clone(),
|
||||||
genesis.hash(),
|
genesis.hash(),
|
||||||
|
cpu_pool.clone(),
|
||||||
)?);
|
)?);
|
||||||
chain_adapter.init(p2p_server.peers.clone());
|
chain_adapter.init(p2p_server.peers.clone());
|
||||||
pool_net_adapter.init(p2p_server.peers.clone());
|
pool_net_adapter.init(p2p_server.peers.clone());
|
||||||
|
|
|
@ -8,6 +8,7 @@ workspace = ".."
|
||||||
bitflags = "^0.7.0"
|
bitflags = "^0.7.0"
|
||||||
byteorder = "^0.5"
|
byteorder = "^0.5"
|
||||||
futures = "^0.1.15"
|
futures = "^0.1.15"
|
||||||
|
futures-cpupool = "^0.1.3"
|
||||||
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
|
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
|
||||||
net2 = "0.2.0"
|
net2 = "0.2.0"
|
||||||
rand = "^0.3"
|
rand = "^0.3"
|
||||||
|
|
|
@ -21,9 +21,9 @@ use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures;
|
use futures;
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream, stream};
|
||||||
use futures::stream;
|
|
||||||
use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
|
use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
|
||||||
|
use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_io::io::{read_exact, write_all};
|
use tokio_io::io::{read_exact, write_all};
|
||||||
|
@ -94,6 +94,7 @@ impl Connection {
|
||||||
/// itself.
|
/// itself.
|
||||||
pub fn listen<F>(
|
pub fn listen<F>(
|
||||||
conn: TcpStream,
|
conn: TcpStream,
|
||||||
|
pool: CpuPool,
|
||||||
handler: F,
|
handler: F,
|
||||||
) -> (Connection, Box<Future<Item = (), Error = Error>>)
|
) -> (Connection, Box<Future<Item = (), Error = Error>>)
|
||||||
where
|
where
|
||||||
|
@ -124,10 +125,10 @@ impl Connection {
|
||||||
};
|
};
|
||||||
|
|
||||||
// setup the reading future, getting messages from the peer and processing them
|
// setup the reading future, getting messages from the peer and processing them
|
||||||
let read_msg = me.read_msg(tx, reader, handler).map(|_| ());
|
let read_msg = me.read_msg(tx, reader, handler, pool).map(|_| ());
|
||||||
|
|
||||||
// setting the writing future, getting messages from our system and sending
|
// setting the writing future
|
||||||
// them out
|
// getting messages from our system and sending them out
|
||||||
let write_msg = me.write_msg(rx, writer).map(|_| ());
|
let write_msg = me.write_msg(rx, writer).map(|_| ());
|
||||||
|
|
||||||
// select between our different futures and return them
|
// select between our different futures and return them
|
||||||
|
@ -154,16 +155,20 @@ impl Connection {
|
||||||
let sent_bytes = self.sent_bytes.clone();
|
let sent_bytes = self.sent_bytes.clone();
|
||||||
let send_data = rx
|
let send_data = rx
|
||||||
.map_err(|_| Error::ConnectionClose)
|
.map_err(|_| Error::ConnectionClose)
|
||||||
.map(move |data| {
|
.map(move |data| {
|
||||||
// add the count of bytes sent
|
trace!(LOGGER, "write_msg: start");
|
||||||
|
// add the count of bytes sent
|
||||||
let mut sent_bytes = sent_bytes.lock().unwrap();
|
let mut sent_bytes = sent_bytes.lock().unwrap();
|
||||||
*sent_bytes += data.len() as u64;
|
*sent_bytes += data.len() as u64;
|
||||||
data
|
data
|
||||||
})
|
})
|
||||||
// write the data and make sure the future returns the right types
|
// write the data and make sure the future returns the right types
|
||||||
.fold(writer, |writer, data| {
|
.fold(writer, |writer, data| {
|
||||||
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| writer)
|
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| {
|
||||||
});
|
trace!(LOGGER, "write_msg: done");
|
||||||
|
writer
|
||||||
|
})
|
||||||
|
});
|
||||||
Box::new(send_data)
|
Box::new(send_data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,28 +179,37 @@ impl Connection {
|
||||||
sender: UnboundedSender<Vec<u8>>,
|
sender: UnboundedSender<Vec<u8>>,
|
||||||
reader: R,
|
reader: R,
|
||||||
handler: F,
|
handler: F,
|
||||||
|
pool: CpuPool,
|
||||||
) -> Box<Future<Item = R, Error = Error>>
|
) -> Box<Future<Item = R, Error = Error>>
|
||||||
where
|
where
|
||||||
F: Handler + 'static,
|
F: Handler + 'static,
|
||||||
R: AsyncRead + 'static,
|
R: AsyncRead + Send + 'static,
|
||||||
{
|
{
|
||||||
// infinite iterator stream so we repeat the message reading logic until the
|
// infinite iterator stream so we repeat the message reading logic until the
|
||||||
// peer is stopped
|
// peer is stopped
|
||||||
let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>));
|
let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>));
|
||||||
|
|
||||||
// setup the reading future, getting messages from the peer and processing them
|
// setup the reading future, getting messages from the peer and processing them
|
||||||
let recv_bytes = self.received_bytes.clone();
|
let recv_bytes = self.received_bytes.clone();
|
||||||
let handler = Arc::new(handler);
|
let handler = Arc::new(handler);
|
||||||
|
|
||||||
let read_msg = iter.fold(reader, move |reader, _| {
|
let mut count = 0;
|
||||||
|
|
||||||
|
let read_msg = iter.buffered(1).fold(reader, move |reader, _| {
|
||||||
|
count += 1;
|
||||||
|
trace!(LOGGER, "read_msg: count (per buffered fold): {}", count);
|
||||||
|
|
||||||
let recv_bytes = recv_bytes.clone();
|
let recv_bytes = recv_bytes.clone();
|
||||||
let handler = handler.clone();
|
let handler = handler.clone();
|
||||||
let sender_inner = sender.clone();
|
let sender_inner = sender.clone();
|
||||||
|
let pool = pool.clone();
|
||||||
|
|
||||||
// first read the message header
|
// first read the message header
|
||||||
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
read_exact(reader, vec![0u8; HEADER_LEN as usize])
|
||||||
.from_err()
|
.from_err()
|
||||||
.and_then(move |(reader, buf)| {
|
.and_then(move |(reader, buf)| {
|
||||||
|
trace!(LOGGER, "read_msg: start");
|
||||||
|
|
||||||
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
|
||||||
Ok((reader, header))
|
Ok((reader, header))
|
||||||
})
|
})
|
||||||
|
@ -210,14 +224,16 @@ impl Connection {
|
||||||
let mut recv_bytes = recv_bytes.lock().unwrap();
|
let mut recv_bytes = recv_bytes.lock().unwrap();
|
||||||
*recv_bytes += header.serialized_len() + header.msg_len;
|
*recv_bytes += header.serialized_len() + header.msg_len;
|
||||||
|
|
||||||
// and handle the different message types
|
pool.spawn_fn(move || {
|
||||||
let msg_type = header.msg_type;
|
let msg_type = header.msg_type;
|
||||||
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
|
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
|
||||||
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
|
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
|
||||||
return Err(Error::Serialization(e));
|
return Err(Error::Serialization(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(reader)
|
trace!(LOGGER, "read_msg: done (via cpu_pool)");
|
||||||
|
Ok(reader)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
Box::new(read_msg)
|
Box::new(read_msg)
|
||||||
|
@ -260,6 +276,7 @@ impl TimeoutConnection {
|
||||||
/// Same as Connection
|
/// Same as Connection
|
||||||
pub fn listen<F>(
|
pub fn listen<F>(
|
||||||
conn: TcpStream,
|
conn: TcpStream,
|
||||||
|
pool: CpuPool,
|
||||||
handler: F,
|
handler: F,
|
||||||
) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
|
) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
|
||||||
where
|
where
|
||||||
|
@ -270,7 +287,7 @@ impl TimeoutConnection {
|
||||||
// Decorates the handler to remove the "subscription" from the expected
|
// Decorates the handler to remove the "subscription" from the expected
|
||||||
// responses. We got our replies, so no timeout should occur.
|
// responses. We got our replies, so no timeout should occur.
|
||||||
let exp = expects.clone();
|
let exp = expects.clone();
|
||||||
let (conn, fut) = Connection::listen(conn, move |sender, header: MsgHeader, data| {
|
let (conn, fut) = Connection::listen(conn, pool, move |sender, header: MsgHeader, data| {
|
||||||
let msg_type = header.msg_type;
|
let msg_type = header.msg_type;
|
||||||
let recv_h = try!(handler.handle(sender, header, data));
|
let recv_h = try!(handler.handle(sender, header, data));
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,8 @@ extern crate bytes;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate enum_primitive;
|
extern crate enum_primitive;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
|
extern crate futures_cpupool;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate grin_core as core;
|
extern crate grin_core as core;
|
||||||
extern crate grin_store;
|
extern crate grin_store;
|
||||||
|
|
|
@ -16,6 +16,7 @@ use std::net::SocketAddr;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
|
|
||||||
use core::core;
|
use core::core;
|
||||||
|
@ -89,12 +90,12 @@ impl Peer {
|
||||||
|
|
||||||
/// Main peer loop listening for messages and forwarding to the rest of the
|
/// Main peer loop listening for messages and forwarding to the rest of the
|
||||||
/// system.
|
/// system.
|
||||||
pub fn run(&self, conn: TcpStream) -> Box<Future<Item = (), Error = Error>> {
|
pub fn run(&self, conn: TcpStream, pool: CpuPool) -> Box<Future<Item = (), Error = Error>> {
|
||||||
let addr = self.info.addr;
|
let addr = self.info.addr;
|
||||||
let state = self.state.clone();
|
let state = self.state.clone();
|
||||||
let adapter = Arc::new(self.tracking_adapter.clone());
|
let adapter = Arc::new(self.tracking_adapter.clone());
|
||||||
|
|
||||||
Box::new(self.proto.handle(conn, adapter, addr).then(move |res| {
|
Box::new(self.proto.handle(conn, adapter, addr, pool).then(move |res| {
|
||||||
// handle disconnection, standard disconnections aren't considered an error
|
// handle disconnection, standard disconnections aren't considered an error
|
||||||
let mut state = state.write().unwrap();
|
let mut state = state.write().unwrap();
|
||||||
match res {
|
match res {
|
||||||
|
|
|
@ -17,6 +17,7 @@ use std::net::SocketAddr;
|
||||||
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use futures::sync::mpsc::UnboundedSender;
|
use futures::sync::mpsc::UnboundedSender;
|
||||||
|
use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
|
|
||||||
use core::core;
|
use core::core;
|
||||||
|
@ -49,8 +50,9 @@ impl Protocol for ProtocolV1 {
|
||||||
conn: TcpStream,
|
conn: TcpStream,
|
||||||
adapter: Arc<NetAdapter>,
|
adapter: Arc<NetAdapter>,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
pool: CpuPool,
|
||||||
) -> Box<Future<Item = (), Error = Error>> {
|
) -> Box<Future<Item = (), Error = Error>> {
|
||||||
let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
|
let (conn, listener) = TimeoutConnection::listen(conn, pool, move |sender, header, data| {
|
||||||
let adapt = adapter.as_ref();
|
let adapt = adapter.as_ref();
|
||||||
handle_payload(adapt, sender, header, data, addr)
|
handle_payload(adapt, sender, header, data, addr)
|
||||||
});
|
});
|
||||||
|
@ -156,7 +158,11 @@ fn handle_payload(
|
||||||
&MsgHeader::new(Type::Pong, body_data.len() as u64),
|
&MsgHeader::new(Type::Pong, body_data.len() as u64),
|
||||||
));
|
));
|
||||||
data.append(&mut body_data);
|
data.append(&mut body_data);
|
||||||
sender.unbounded_send(data).unwrap();
|
|
||||||
|
if let Err(e) = sender.unbounded_send(data) {
|
||||||
|
debug!(LOGGER, "handle_payload: Ping, error sending: {:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
Type::Pong => {
|
Type::Pong => {
|
||||||
|
@ -184,13 +190,18 @@ fn handle_payload(
|
||||||
&MsgHeader::new(Type::Block, body_data.len() as u64),
|
&MsgHeader::new(Type::Block, body_data.len() as u64),
|
||||||
));
|
));
|
||||||
data.append(&mut body_data);
|
data.append(&mut body_data);
|
||||||
sender.unbounded_send(data).unwrap();
|
if let Err(e) = sender.unbounded_send(data) {
|
||||||
|
debug!(LOGGER, "handle_payload: GetBlock, error sending: {:?}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
Type::Block => {
|
Type::Block => {
|
||||||
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
|
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
|
||||||
let bh = b.hash();
|
let bh = b.hash();
|
||||||
|
|
||||||
|
debug!(LOGGER, "handle_payload: Block {}", bh);
|
||||||
|
|
||||||
adapter.block_received(b, addr);
|
adapter.block_received(b, addr);
|
||||||
Ok(Some(bh))
|
Ok(Some(bh))
|
||||||
}
|
}
|
||||||
|
@ -211,7 +222,9 @@ fn handle_payload(
|
||||||
&MsgHeader::new(Type::Headers, body_data.len() as u64),
|
&MsgHeader::new(Type::Headers, body_data.len() as u64),
|
||||||
));
|
));
|
||||||
data.append(&mut body_data);
|
data.append(&mut body_data);
|
||||||
sender.unbounded_send(data).unwrap();
|
if let Err(e) = sender.unbounded_send(data) {
|
||||||
|
debug!(LOGGER, "handle_payload: GetHeaders, error sending: {:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
@ -238,7 +251,9 @@ fn handle_payload(
|
||||||
&MsgHeader::new(Type::PeerAddrs, body_data.len() as u64),
|
&MsgHeader::new(Type::PeerAddrs, body_data.len() as u64),
|
||||||
));
|
));
|
||||||
data.append(&mut body_data);
|
data.append(&mut body_data);
|
||||||
sender.unbounded_send(data).unwrap();
|
if let Err(e) = sender.unbounded_send(data) {
|
||||||
|
debug!(LOGGER, "handle_payload: GetPeerAddrs, error sending: {:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ use std::time::Duration;
|
||||||
use futures;
|
use futures;
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
use futures::future::{self, IntoFuture};
|
use futures::future::{self, IntoFuture};
|
||||||
|
use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::{TcpListener, TcpStream};
|
use tokio_core::net::{TcpListener, TcpStream};
|
||||||
use tokio_core::reactor;
|
use tokio_core::reactor;
|
||||||
use tokio_timer::Timer;
|
use tokio_timer::Timer;
|
||||||
|
@ -39,6 +40,7 @@ use util::LOGGER;
|
||||||
|
|
||||||
/// A no-op network adapter used for testing.
|
/// A no-op network adapter used for testing.
|
||||||
pub struct DummyAdapter {}
|
pub struct DummyAdapter {}
|
||||||
|
|
||||||
impl ChainAdapter for DummyAdapter {
|
impl ChainAdapter for DummyAdapter {
|
||||||
fn total_difficulty(&self) -> Difficulty {
|
fn total_difficulty(&self) -> Difficulty {
|
||||||
Difficulty::one()
|
Difficulty::one()
|
||||||
|
@ -56,6 +58,7 @@ impl ChainAdapter for DummyAdapter {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetAdapter for DummyAdapter {
|
impl NetAdapter for DummyAdapter {
|
||||||
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
|
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
|
||||||
vec![]
|
vec![]
|
||||||
|
@ -71,6 +74,7 @@ pub struct Server {
|
||||||
capabilities: Capabilities,
|
capabilities: Capabilities,
|
||||||
handshake: Arc<Handshake>,
|
handshake: Arc<Handshake>,
|
||||||
pub peers: Peers,
|
pub peers: Peers,
|
||||||
|
pool: CpuPool,
|
||||||
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
|
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,12 +90,14 @@ impl Server {
|
||||||
config: P2PConfig,
|
config: P2PConfig,
|
||||||
adapter: Arc<ChainAdapter>,
|
adapter: Arc<ChainAdapter>,
|
||||||
genesis: Hash,
|
genesis: Hash,
|
||||||
|
pool: CpuPool,
|
||||||
) -> Result<Server, Error> {
|
) -> Result<Server, Error> {
|
||||||
Ok(Server {
|
Ok(Server {
|
||||||
config: config,
|
config: config,
|
||||||
capabilities: capab,
|
capabilities: capab,
|
||||||
handshake: Arc::new(Handshake::new(genesis)),
|
handshake: Arc::new(Handshake::new(genesis)),
|
||||||
peers: Peers::new(PeerStore::new(db_root)?, adapter),
|
peers: Peers::new(PeerStore::new(db_root)?, adapter),
|
||||||
|
pool: pool,
|
||||||
stop: RefCell::new(None),
|
stop: RefCell::new(None),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -106,6 +112,7 @@ impl Server {
|
||||||
let handshake = self.handshake.clone();
|
let handshake = self.handshake.clone();
|
||||||
let peers = self.peers.clone();
|
let peers = self.peers.clone();
|
||||||
let capab = self.capabilities.clone();
|
let capab = self.capabilities.clone();
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
|
||||||
// main peer acceptance future handling handshake
|
// main peer acceptance future handling handshake
|
||||||
let hp = h.clone();
|
let hp = h.clone();
|
||||||
|
@ -116,6 +123,7 @@ impl Server {
|
||||||
let peers2 = peers.clone();
|
let peers2 = peers.clone();
|
||||||
let handshake = handshake.clone();
|
let handshake = handshake.clone();
|
||||||
let hp = hp.clone();
|
let hp = hp.clone();
|
||||||
|
let pool = pool.clone();
|
||||||
|
|
||||||
future::ok(conn).and_then(move |conn| {
|
future::ok(conn).and_then(move |conn| {
|
||||||
// Refuse banned peers connection
|
// Refuse banned peers connection
|
||||||
|
@ -149,7 +157,7 @@ impl Server {
|
||||||
// run the main peer protocol
|
// run the main peer protocol
|
||||||
timed_peer.and_then(move |(conn, peer)| {
|
timed_peer.and_then(move |(conn, peer)| {
|
||||||
let peer = peer.read().unwrap();
|
let peer = peer.read().unwrap();
|
||||||
peer.run(conn)
|
peer.run(conn, pool)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
@ -220,6 +228,8 @@ impl Server {
|
||||||
let peers = self.peers.clone();
|
let peers = self.peers.clone();
|
||||||
let handshake = self.handshake.clone();
|
let handshake = self.handshake.clone();
|
||||||
let capab = self.capabilities.clone();
|
let capab = self.capabilities.clone();
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
|
||||||
let self_addr = SocketAddr::new(self.config.host, self.config.port);
|
let self_addr = SocketAddr::new(self.config.host, self.config.port);
|
||||||
|
|
||||||
let timer = Timer::default();
|
let timer = Timer::default();
|
||||||
|
@ -251,7 +261,7 @@ impl Server {
|
||||||
})
|
})
|
||||||
.and_then(move |(socket, peer)| {
|
.and_then(move |(socket, peer)| {
|
||||||
let peer_inner = peer.read().unwrap();
|
let peer_inner = peer.read().unwrap();
|
||||||
h2.spawn(peer_inner.run(socket).map_err(|e| {
|
h2.spawn(peer_inner.run(socket, pool).map_err(|e| {
|
||||||
error!(LOGGER, "Peer error: {:?}", e);
|
error!(LOGGER, "Peer error: {:?}", e);
|
||||||
()
|
()
|
||||||
}));
|
}));
|
||||||
|
|
|
@ -18,6 +18,7 @@ use std::net::{IpAddr, SocketAddr};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
use tokio_timer::TimerError;
|
use tokio_timer::TimerError;
|
||||||
|
|
||||||
|
@ -131,7 +132,7 @@ pub trait Protocol {
|
||||||
/// be known already, usually passed during construction. Will typically
|
/// be known already, usually passed during construction. Will typically
|
||||||
/// block so needs to be called withing a coroutine. Should also be called
|
/// block so needs to be called withing a coroutine. Should also be called
|
||||||
/// only once.
|
/// only once.
|
||||||
fn handle(&self, conn: TcpStream, na: Arc<NetAdapter>, addr: SocketAddr)
|
fn handle(&self, conn: TcpStream, na: Arc<NetAdapter>, addr: SocketAddr, pool: CpuPool)
|
||||||
-> Box<Future<Item = (), Error = Error>>;
|
-> Box<Future<Item = (), Error = Error>>;
|
||||||
|
|
||||||
/// Sends a ping message to the remote peer.
|
/// Sends a ping message to the remote peer.
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
|
extern crate futures_cpupool;
|
||||||
extern crate grin_core as core;
|
extern crate grin_core as core;
|
||||||
extern crate grin_p2p as p2p;
|
extern crate grin_p2p as p2p;
|
||||||
extern crate tokio_core;
|
extern crate tokio_core;
|
||||||
|
@ -22,6 +23,7 @@ use std::sync::Arc;
|
||||||
use std::time;
|
use std::time;
|
||||||
|
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
|
use futures_cpupool::CpuPool;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
use tokio_core::reactor::{self, Core};
|
use tokio_core::reactor::{self, Core};
|
||||||
|
|
||||||
|
@ -37,12 +39,14 @@ fn peer_handshake() {
|
||||||
let handle = evtlp.handle();
|
let handle = evtlp.handle();
|
||||||
let p2p_conf = p2p::P2PConfig::default();
|
let p2p_conf = p2p::P2PConfig::default();
|
||||||
let net_adapter = Arc::new(p2p::DummyAdapter {});
|
let net_adapter = Arc::new(p2p::DummyAdapter {});
|
||||||
|
let pool = CpuPool::new(1);
|
||||||
let server = p2p::Server::new(
|
let server = p2p::Server::new(
|
||||||
".grin".to_owned(),
|
".grin".to_owned(),
|
||||||
p2p::UNKNOWN,
|
p2p::UNKNOWN,
|
||||||
p2p_conf,
|
p2p_conf,
|
||||||
net_adapter.clone(),
|
net_adapter.clone(),
|
||||||
Hash::from_vec(vec![]),
|
Hash::from_vec(vec![]),
|
||||||
|
pool.clone(),
|
||||||
).unwrap();
|
).unwrap();
|
||||||
let run_server = server.start(handle.clone());
|
let run_server = server.start(handle.clone());
|
||||||
let my_addr = "127.0.0.1:5000".parse().unwrap();
|
let my_addr = "127.0.0.1:5000".parse().unwrap();
|
||||||
|
@ -71,7 +75,7 @@ fn peer_handshake() {
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.and_then(move |(socket, peer)| {
|
.and_then(move |(socket, peer)| {
|
||||||
rhandle.spawn(peer.run(socket).map_err(|e| {
|
rhandle.spawn(peer.run(socket, pool).map_err(|e| {
|
||||||
panic!("Client run failed: {:?}", e);
|
panic!("Client run failed: {:?}", e);
|
||||||
}));
|
}));
|
||||||
peer.send_ping(Difficulty::one(), 0).unwrap();
|
peer.send_ping(Difficulty::one(), 0).unwrap();
|
||||||
|
|
Loading…
Reference in a new issue