No send back for blocks and transactions (#212)

* Do not send txs and blocks to senders. Keeps a ring buffer of transaction and blocks hashes that a peer has received. Do not send what we've already received.
* Test fix and fmt
This commit is contained in:
Ignotus Peverell 2017-10-26 17:48:51 +00:00 committed by GitHub
parent 68bcd79da5
commit efe414bf07
10 changed files with 306 additions and 165 deletions

View file

@ -43,22 +43,26 @@ pub trait Handler: Sync + Send {
/// Handle function to implement to process incoming messages. A sender to /// Handle function to implement to process incoming messages. A sender to
/// reply immediately as well as the message header and its unparsed body /// reply immediately as well as the message header and its unparsed body
/// are provided. /// are provided.
fn handle(&self, fn handle(
&self,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
header: MsgHeader, header: MsgHeader,
body: Vec<u8>) body: Vec<u8>,
-> Result<Option<Hash>, ser::Error>; ) -> Result<Option<Hash>, ser::Error>;
} }
impl<F> Handler for F impl<F> Handler for F
where F: Fn(UnboundedSender<Vec<u8>>, MsgHeader, Vec<u8>) -> Result<Option<Hash>, ser::Error>, where
F: Sync + Send F: Fn(UnboundedSender<Vec<u8>>, MsgHeader, Vec<u8>)
-> Result<Option<Hash>, ser::Error>,
F: Sync + Send,
{ {
fn handle(&self, fn handle(
&self,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
header: MsgHeader, header: MsgHeader,
body: Vec<u8>) body: Vec<u8>,
-> Result<Option<Hash>, ser::Error> { ) -> Result<Option<Hash>, ser::Error> {
self(sender, header, body) self(sender, header, body)
} }
} }
@ -88,10 +92,12 @@ impl Connection {
/// Start listening on the provided connection and wraps it. Does not hang /// Start listening on the provided connection and wraps it. Does not hang
/// the current thread, instead just returns a future and the Connection /// the current thread, instead just returns a future and the Connection
/// itself. /// itself.
pub fn listen<F>(conn: TcpStream, pub fn listen<F>(
handler: F) conn: TcpStream,
-> (Connection, Box<Future<Item = (), Error = Error>>) handler: F,
where F: Handler + 'static ) -> (Connection, Box<Future<Item = (), Error = Error>>)
where
F: Handler + 'static,
{ {
let (reader, writer) = conn.split(); let (reader, writer) = conn.split();
@ -106,9 +112,9 @@ impl Connection {
// same for closing the connection // same for closing the connection
let (close_tx, close_rx) = futures::sync::mpsc::channel(1); let (close_tx, close_rx) = futures::sync::mpsc::channel(1);
let close_conn = close_rx let close_conn = close_rx.for_each(|_| Ok(())).map_err(
.for_each(|_| Ok(())) |_| Error::ConnectionClose,
.map_err(|_| Error::ConnectionClose); );
let me = Connection { let me = Connection {
outbound_chan: tx.clone(), outbound_chan: tx.clone(),
@ -138,11 +144,13 @@ impl Connection {
/// Prepares the future that gets message data produced by our system and /// Prepares the future that gets message data produced by our system and
/// sends it to the peer connection /// sends it to the peer connection
fn write_msg<W>(&self, fn write_msg<W>(
&self,
rx: UnboundedReceiver<Vec<u8>>, rx: UnboundedReceiver<Vec<u8>>,
writer: W) writer: W,
-> Box<Future<Item = W, Error = Error>> ) -> Box<Future<Item = W, Error = Error>>
where W: AsyncWrite + 'static where
W: AsyncWrite + 'static,
{ {
let sent_bytes = self.sent_bytes.clone(); let sent_bytes = self.sent_bytes.clone();
@ -163,13 +171,15 @@ impl Connection {
/// Prepares the future reading from the peer connection, parsing each /// Prepares the future reading from the peer connection, parsing each
/// message and forwarding them appropriately based on their type /// message and forwarding them appropriately based on their type
fn read_msg<F, R>(&self, fn read_msg<F, R>(
&self,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
reader: R, reader: R,
handler: F) handler: F,
-> Box<Future<Item = R, Error = Error>> ) -> Box<Future<Item = R, Error = Error>>
where F: Handler + 'static, where
R: AsyncRead + 'static F: Handler + 'static,
R: AsyncRead + 'static,
{ {
// infinite iterator stream so we repeat the message reading logic until the // infinite iterator stream so we repeat the message reading logic until the
@ -223,12 +233,15 @@ impl Connection {
let mut body_data = vec![]; let mut body_data = vec![];
try!(ser::serialize(&mut body_data, body)); try!(ser::serialize(&mut body_data, body));
let mut data = vec![]; let mut data = vec![];
try!(ser::serialize(&mut data, &MsgHeader::new(t, body_data.len() as u64))); try!(ser::serialize(
&mut data,
&MsgHeader::new(t, body_data.len() as u64),
));
data.append(&mut body_data); data.append(&mut body_data);
self.outbound_chan self.outbound_chan.unbounded_send(data).map_err(|_| {
.unbounded_send(data) Error::ConnectionClose
.map_err(|_| Error::ConnectionClose) })
} }
/// Bytes sent and received by this peer to the remote peer. /// Bytes sent and received by this peer to the remote peer.
@ -249,10 +262,12 @@ pub struct TimeoutConnection {
impl TimeoutConnection { impl TimeoutConnection {
/// Same as Connection /// Same as Connection
pub fn listen<F>(conn: TcpStream, pub fn listen<F>(
handler: F) conn: TcpStream,
-> (TimeoutConnection, Box<Future<Item = (), Error = Error>>) handler: F,
where F: Handler + 'static ) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
where
F: Handler + 'static,
{ {
let expects = Arc::new(Mutex::new(vec![])); let expects = Arc::new(Mutex::new(vec![]));
@ -296,17 +311,21 @@ impl TimeoutConnection {
underlying: conn, underlying: conn,
expected_responses: expects, expected_responses: expects,
}; };
(me, Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, _)| e1))) (
me,
Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, _)| e1)),
)
} }
/// Sends a request and registers a timer on the provided message type and /// Sends a request and registers a timer on the provided message type and
/// optionally the hash of the sent data. /// optionally the hash of the sent data.
pub fn send_request<W: ser::Writeable>(&self, pub fn send_request<W: ser::Writeable>(
&self,
t: Type, t: Type,
rt: Type, rt: Type,
body: &W, body: &W,
expect_h: Option<(Hash)>) expect_h: Option<(Hash)>,
-> Result<(), Error> { ) -> Result<(), Error> {
let _sent = try!(self.underlying.send_msg(t, body)); let _sent = try!(self.underlying.send_msg(t, body));
let mut expects = self.expected_responses.lock().unwrap(); let mut expects = self.expected_responses.lock().unwrap();

View file

@ -48,12 +48,13 @@ impl Handshake {
} }
/// Handles connecting to a new remote peer, starting the version handshake. /// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect(&self, pub fn connect(
&self,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
self_addr: SocketAddr, self_addr: SocketAddr,
conn: TcpStream) conn: TcpStream,
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> { ) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
// prepare the first part of the hanshake // prepare the first part of the hanshake
let nonce = self.next_nonce(); let nonce = self.next_nonce();
let hand = Hand { let hand = Hand {
@ -95,11 +96,12 @@ impl Handshake {
/// Handles receiving a connection from a new remote peer that started the /// Handles receiving a connection from a new remote peer that started the
/// version handshake. /// version handshake.
pub fn handshake(&self, pub fn handshake(
&self,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
conn: TcpStream) conn: TcpStream,
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> { ) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonces = self.nonces.clone(); let nonces = self.nonces.clone();
Box::new( Box::new(
read_msg::<Hand>(conn) read_msg::<Hand>(conn)

View file

@ -70,7 +70,8 @@ enum_from_primitive! {
/// the header first, handles its validation and then reads the Readable body, /// the header first, handles its validation and then reads the Readable body,
/// allocating buffers of the right size. /// allocating buffers of the right size.
pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error = Error>> pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error = Error>>
where T: Readable + 'static where
T: Readable + 'static,
{ {
let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize]) let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize])
.from_err() .from_err()
@ -98,11 +99,13 @@ pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error =
/// Future combinator to write a full message from a Writeable payload. /// Future combinator to write a full message from a Writeable payload.
/// Serializes the payload first and then sends the message header and that /// Serializes the payload first and then sends the message header and that
/// payload. /// payload.
pub fn write_msg<T>(conn: TcpStream, pub fn write_msg<T>(
conn: TcpStream,
msg: T, msg: T,
msg_type: Type) msg_type: Type,
-> Box<Future<Item = TcpStream, Error = Error>> ) -> Box<Future<Item = TcpStream, Error = Error>>
where T: Writeable + 'static where
T: Writeable + 'static,
{ {
let write_msg = ok((conn)).and_then(move |conn| { let write_msg = ok((conn)).and_then(move |conn| {
// prepare the body first so we know its serialized length // prepare the body first so we know its serialized length
@ -223,7 +226,9 @@ impl Readable for Hand {
let receiver_addr = try!(SockAddr::read(reader)); let receiver_addr = try!(SockAddr::read(reader));
let ua = try!(reader.read_vec()); let ua = try!(reader.read_vec());
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(
ser::Error::CorruptedData,
));
Ok(Hand { Ok(Hand {
version: version, version: version,
capabilities: capabilities, capabilities: capabilities,
@ -270,7 +275,9 @@ impl Readable for Shake {
let total_diff = try!(Difficulty::read(reader)); let total_diff = try!(Difficulty::read(reader));
let ua = try!(reader.read_vec()); let ua = try!(reader.read_vec());
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(
ser::Error::CorruptedData,
));
Ok(Shake { Ok(Shake {
version: version, version: version,
capabilities: capabilities, capabilities: capabilities,
@ -295,7 +302,9 @@ impl Writeable for GetPeerAddrs {
impl Readable for GetPeerAddrs { impl Readable for GetPeerAddrs {
fn read(reader: &mut Reader) -> Result<GetPeerAddrs, ser::Error> { fn read(reader: &mut Reader) -> Result<GetPeerAddrs, ser::Error> {
let capab = try!(reader.read_u32()); let capab = try!(reader.read_u32());
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(
ser::Error::CorruptedData,
));
Ok(GetPeerAddrs { capabilities: capabilities }) Ok(GetPeerAddrs { capabilities: capabilities })
} }
} }
@ -352,7 +361,9 @@ impl Writeable for PeerError {
impl Readable for PeerError { impl Readable for PeerError {
fn read(reader: &mut Reader) -> Result<PeerError, ser::Error> { fn read(reader: &mut Reader) -> Result<PeerError, ser::Error> {
let (code, msg) = ser_multiread!(reader, read_u32, read_vec); let (code, msg) = ser_multiread!(reader, read_u32, read_vec);
let message = try!(String::from_utf8(msg).map_err(|_| ser::Error::CorruptedData)); let message = try!(String::from_utf8(msg).map_err(
|_| ser::Error::CorruptedData,
));
Ok(PeerError { Ok(PeerError {
code: code, code: code,
message: message, message: message,

View file

@ -19,12 +19,14 @@ use futures::Future;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use core::core; use core::core;
use core::core::hash::Hash; use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty; use core::core::target::Difficulty;
use handshake::Handshake; use handshake::Handshake;
use types::*; use types::*;
use util::LOGGER; use util::LOGGER;
const MAX_TRACK_SIZE: usize = 30;
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State { enum State {
Connected, Connected,
@ -36,59 +38,66 @@ pub struct Peer {
pub info: PeerInfo, pub info: PeerInfo,
proto: Box<Protocol>, proto: Box<Protocol>,
state: Arc<RwLock<State>>, state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter,
} }
unsafe impl Sync for Peer {} unsafe impl Sync for Peer {}
unsafe impl Send for Peer {} unsafe impl Send for Peer {}
impl Peer { impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, proto: Box<Protocol>, na: Arc<NetAdapter>) -> Peer {
Peer {
info: info,
proto: proto,
state: Arc::new(RwLock::new(State::Connected)),
tracking_adapter: TrackingAdapter::new(na),
}
}
/// Initiates the handshake with another peer. /// Initiates the handshake with another peer.
pub fn connect(conn: TcpStream, pub fn connect(
conn: TcpStream,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
self_addr: SocketAddr, self_addr: SocketAddr,
hs: &Handshake) hs: &Handshake,
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> { na: Arc<NetAdapter>,
) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn) let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn)
.and_then(|(conn, proto, info)| { .and_then(|(conn, proto, info)| {
Ok((conn, Ok((conn, Peer::new(info, Box::new(proto), na)))
Peer {
info: info,
proto: Box::new(proto),
state: Arc::new(RwLock::new(State::Connected)),
}))
}); });
Box::new(connect_peer) Box::new(connect_peer)
} }
/// Accept a handshake initiated by another peer. /// Accept a handshake initiated by another peer.
pub fn accept(conn: TcpStream, pub fn accept(
conn: TcpStream,
capab: Capabilities, capab: Capabilities,
total_difficulty: Difficulty, total_difficulty: Difficulty,
hs: &Handshake) hs: &Handshake,
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> { na: Arc<NetAdapter>,
let hs_peer = hs.handshake(capab, total_difficulty, conn) ) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
.and_then(|(conn, proto, info)| { let hs_peer = hs.handshake(capab, total_difficulty, conn).and_then(
Ok((conn, |(conn,
Peer { proto,
info: info, info)| {
proto: Box::new(proto), Ok((conn, Peer::new(info, Box::new(proto), na)))
state: Arc::new(RwLock::new(State::Connected)), },
})) );
});
Box::new(hs_peer) Box::new(hs_peer)
} }
/// Main peer loop listening for messages and forwarding to the rest of the /// Main peer loop listening for messages and forwarding to the rest of the
/// system. /// system.
pub fn run(&self, pub fn run(&self, conn: TcpStream) -> Box<Future<Item = (), Error = Error>> {
conn: TcpStream,
na: Arc<NetAdapter>)
-> Box<Future<Item = (), Error = Error>> {
let addr = self.info.addr; let addr = self.info.addr;
let state = self.state.clone(); let state = self.state.clone();
Box::new(self.proto.handle(conn, na).then(move |res| { let adapter = Arc::new(self.tracking_adapter.clone());
Box::new(self.proto.handle(conn, adapter).then(move |res| {
// handle disconnection, standard disconnections aren't considered an error // handle disconnection, standard disconnections aren't considered an error
let mut state = state.write().unwrap(); let mut state = state.write().unwrap();
match res { match res {
@ -135,15 +144,21 @@ impl Peer {
/// Sends the provided block to the remote peer. The request may be dropped /// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block. /// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<(), Error> { pub fn send_block(&self, b: &core::Block) -> Result<(), Error> {
// TODO do not send if the peer sent us the block in the first place if !self.tracking_adapter.has(b.hash()) {
self.proto.send_block(b) self.proto.send_block(b)
} else {
Ok(())
}
} }
/// Sends the provided transaction to the remote peer. The request may be /// Sends the provided transaction to the remote peer. The request may be
/// dropped if the remote peer is known to already have the transaction. /// dropped if the remote peer is known to already have the transaction.
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
// TODO do not send if the peer sent us the tx in the first place if !self.tracking_adapter.has(tx.hash()) {
self.proto.send_transaction(tx) self.proto.send_transaction(tx)
} else {
Ok(())
}
} }
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> { pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
@ -169,3 +184,75 @@ impl Peer {
self.proto.close(); self.proto.close();
} }
} }
/// Adapter implementation that forwards everything to an underlying adapter
/// but keeps track of the block and transaction hashes that were received.
#[derive(Clone)]
struct TrackingAdapter {
adapter: Arc<NetAdapter>,
known: Arc<RwLock<Vec<Hash>>>,
}
impl TrackingAdapter {
fn new(adapter: Arc<NetAdapter>) -> TrackingAdapter {
TrackingAdapter {
adapter: adapter,
known: Arc::new(RwLock::new(vec![])),
}
}
fn has(&self, hash: Hash) -> bool {
let known = self.known.read().unwrap();
// may become too slow, an ordered set (by timestamp for eviction) may
// end up being a better choice
known.contains(&hash)
}
fn push(&self, hash: Hash) {
let mut known = self.known.write().unwrap();
if known.len() > MAX_TRACK_SIZE {
known.truncate(MAX_TRACK_SIZE);
}
known.insert(0, hash);
}
}
impl NetAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Difficulty {
self.adapter.total_difficulty()
}
fn transaction_received(&self, tx: core::Transaction) {
self.push(tx.hash());
self.adapter.transaction_received(tx)
}
fn block_received(&self, b: core::Block) {
self.push(b.hash());
self.adapter.block_received(b)
}
fn headers_received(&self, bh: Vec<core::BlockHeader>) {
self.adapter.headers_received(bh)
}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {
self.adapter.locate_headers(locator)
}
fn get_block(&self, h: Hash) -> Option<core::Block> {
self.adapter.get_block(h)
}
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
self.adapter.find_peer_addrs(capab)
}
fn peer_addrs_received(&self, addrs: Vec<SocketAddr>) {
self.adapter.peer_addrs_received(addrs)
}
fn peer_connected(&self, pi: &PeerInfo) {
self.adapter.peer_connected(pi)
}
}

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::{Mutex, Arc}; use std::sync::Arc;
use futures::Future; use futures::Future;
use futures::sync::mpsc::UnboundedSender; use futures::sync::mpsc::UnboundedSender;
@ -30,25 +30,21 @@ use util::OneTime;
#[allow(dead_code)] #[allow(dead_code)]
pub struct ProtocolV1 { pub struct ProtocolV1 {
conn: OneTime<TimeoutConnection>, conn: OneTime<TimeoutConnection>,
expected_responses: Mutex<Vec<(Type, Hash)>>,
} }
impl ProtocolV1 { impl ProtocolV1 {
pub fn new() -> ProtocolV1 { pub fn new() -> ProtocolV1 {
ProtocolV1 { ProtocolV1 { conn: OneTime::new() }
conn: OneTime::new(),
expected_responses: Mutex::new(vec![]),
}
} }
} }
impl Protocol for ProtocolV1 { impl Protocol for ProtocolV1 {
/// Sets up the protocol reading, writing and closing logic. /// Sets up the protocol reading, writing and closing logic.
fn handle(&self, fn handle(
&self,
conn: TcpStream, conn: TcpStream,
adapter: Arc<NetAdapter>) adapter: Arc<NetAdapter>,
-> Box<Future<Item = (), Error = Error>> { ) -> Box<Future<Item = (), Error = Error>> {
let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| { let (conn, listener) = TimeoutConnection::listen(conn, move |sender, header, data| {
let adapt = adapter.as_ref(); let adapt = adapter.as_ref();
@ -114,21 +110,23 @@ impl ProtocolV1 {
self.conn.borrow().send_msg(t, body) self.conn.borrow().send_msg(t, body)
} }
fn send_request<W: ser::Writeable>(&self, fn send_request<W: ser::Writeable>(
&self,
t: Type, t: Type,
rt: Type, rt: Type,
body: &W, body: &W,
expect_resp: Option<Hash>) expect_resp: Option<Hash>,
-> Result<(), Error> { ) -> Result<(), Error> {
self.conn.borrow().send_request(t, rt, body, expect_resp) self.conn.borrow().send_request(t, rt, body, expect_resp)
} }
} }
fn handle_payload(adapter: &NetAdapter, fn handle_payload(
adapter: &NetAdapter,
sender: UnboundedSender<Vec<u8>>, sender: UnboundedSender<Vec<u8>>,
header: MsgHeader, header: MsgHeader,
buf: Vec<u8>) buf: Vec<u8>,
-> Result<Option<Hash>, ser::Error> { ) -> Result<Option<Hash>, ser::Error> {
match header.msg_type { match header.msg_type {
Type::Ping => { Type::Ping => {
let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?; let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?;
@ -171,7 +169,10 @@ fn handle_payload(adapter: &NetAdapter,
// serialize and send all the headers over // serialize and send all the headers over
let mut body_data = vec![]; let mut body_data = vec![];
try!(ser::serialize(&mut body_data, &Headers { headers: headers })); try!(ser::serialize(
&mut body_data,
&Headers { headers: headers },
));
let mut data = vec![]; let mut data = vec![];
try!(ser::serialize( try!(ser::serialize(
&mut data, &mut data,

View file

@ -77,7 +77,10 @@ impl<R: AsyncRead> io::Read for ThrottledReader<R> {
// Check if Allowed // Check if Allowed
if self.allowed < 1 { if self.allowed < 1 {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "Reached Allowed Read Limit")); return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Reached Allowed Read Limit",
));
} }
// Read Max Allowed // Read Max Allowed
@ -155,7 +158,10 @@ impl<W: AsyncWrite> io::Write for ThrottledWriter<W> {
// Check if Allowed // Check if Allowed
if self.allowed < 1 { if self.allowed < 1 {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "Reached Allowed Write Limit")); return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Reached Allowed Write Limit",
));
} }
// Write max allowed // Write max allowed

View file

@ -104,14 +104,14 @@ impl Server {
let peers = peers.clone(); let peers = peers.clone();
// accept the peer and add it to the server map // accept the peer and add it to the server map
let accept = Peer::accept(conn, capab, total_diff, &hs.clone()); let accept = Peer::accept(conn, capab, total_diff, &hs.clone(), adapter.clone());
let added = add_to_peers(peers, adapter.clone(), accept); let added = add_to_peers(peers, adapter, accept);
// wire in a future to timeout the accept after 5 secs // wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(added), &hp); let timed_peer = with_timeout(Box::new(added), &hp);
// run the main peer protocol // run the main peer protocol
timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn, adapter)) timed_peer.and_then(move |(conn, peer)| peer.clone().run(conn))
}); });
// spawn each peer future to its own task // spawn each peer future to its own task
@ -144,10 +144,11 @@ impl Server {
} }
/// Asks the server to connect to a new peer. /// Asks the server to connect to a new peer.
pub fn connect_peer(&self, pub fn connect_peer(
&self,
addr: SocketAddr, addr: SocketAddr,
h: reactor::Handle) h: reactor::Handle,
-> Box<Future<Item = Option<Arc<Peer>>, Error = Error>> { ) -> Box<Future<Item = Option<Arc<Peer>>, Error = Error>> {
if let Some(p) = self.get_peer(addr) { if let Some(p) = self.get_peer(addr) {
// if we're already connected to the addr, just return the peer // if we're already connected to the addr, just return the peer
return Box::new(future::ok(Some(p))); return Box::new(future::ok(Some(p)));
@ -159,8 +160,7 @@ impl Server {
// cloneapalooza // cloneapalooza
let peers = self.peers.clone(); let peers = self.peers.clone();
let adapter1 = self.adapter.clone(); let adapter = self.adapter.clone();
let adapter2 = self.adapter.clone();
let capab = self.capabilities.clone(); let capab = self.capabilities.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port); let self_addr = SocketAddr::new(self.config.host, self.config.port);
@ -171,17 +171,23 @@ impl Server {
let request = socket let request = socket
.and_then(move |socket| { .and_then(move |socket| {
let peers = peers.clone(); let peers = peers.clone();
let total_diff = adapter1.clone().total_difficulty(); let total_diff = adapter.clone().total_difficulty();
// connect to the peer and add it to the server map, wiring it a timeout for // connect to the peer and add it to the server map, wiring it a timeout for
// the handhake // the handhake
let connect = let connect = Peer::connect(
Peer::connect(socket, capab, total_diff, self_addr, &Handshake::new()); socket,
let added = add_to_peers(peers, adapter1, connect); capab,
total_diff,
self_addr,
&Handshake::new(),
adapter.clone(),
);
let added = add_to_peers(peers, adapter, connect);
with_timeout(Box::new(added), &h) with_timeout(Box::new(added), &h)
}) })
.and_then(move |(socket, peer)| { .and_then(move |(socket, peer)| {
h2.spawn(peer.run(socket, adapter2).map_err(|e| { h2.spawn(peer.run(socket).map_err(|e| {
error!(LOGGER, "Peer error: {:?}", e); error!(LOGGER, "Peer error: {:?}", e);
() ()
})); }));
@ -300,11 +306,13 @@ impl Server {
} }
// Adds the peer built by the provided future in the peers map // Adds the peer built by the provided future in the peers map
fn add_to_peers<A>(peers: Arc<RwLock<Vec<Arc<Peer>>>>, fn add_to_peers<A>(
peers: Arc<RwLock<Vec<Arc<Peer>>>>,
adapter: Arc<NetAdapter>, adapter: Arc<NetAdapter>,
peer_fut: A) peer_fut: A,
-> Box<Future<Item = Result<(TcpStream, Arc<Peer>), ()>, Error = Error>> ) -> Box<Future<Item = Result<(TcpStream, Arc<Peer>), ()>, Error = Error>>
where A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static where
A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static,
{ {
let peer_add = peer_fut.into_future().map(move |(conn, peer)| { let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
adapter.peer_connected(&peer.info); adapter.peer_connected(&peer.info);
@ -317,15 +325,17 @@ fn add_to_peers<A>(peers: Arc<RwLock<Vec<Arc<Peer>>>>,
} }
// Adds a timeout to a future // Adds a timeout to a future
fn with_timeout<T: 'static>(fut: Box<Future<Item = Result<T, ()>, Error = Error>>, fn with_timeout<T: 'static>(
h: &reactor::Handle) fut: Box<Future<Item = Result<T, ()>, Error = Error>>,
-> Box<Future<Item = T, Error = Error>> { h: &reactor::Handle,
) -> Box<Future<Item = T, Error = Error>> {
let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap(); let timeout = reactor::Timeout::new(Duration::new(5, 0), h).unwrap();
let timed = fut.select(timeout.map(Err).from_err()) let timed = fut.select(timeout.map(Err).from_err()).then(
.then(|res| match res { |res| match res {
Ok((Ok(inner), _timeout)) => Ok(inner), Ok((Ok(inner), _timeout)) => Ok(inner),
Ok((_, _accept)) => Err(Error::Timeout), Ok((_, _accept)) => Err(Error::Timeout),
Err((e, _other)) => Err(e), Err((e, _other)) => Err(e),
}); },
);
Box::new(timed) Box::new(timed)
} }

View file

@ -67,10 +67,10 @@ impl Readable for PeerData {
fn read(reader: &mut Reader) -> Result<PeerData, ser::Error> { fn read(reader: &mut Reader) -> Result<PeerData, ser::Error> {
let addr = SockAddr::read(reader)?; let addr = SockAddr::read(reader)?;
let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8);
let user_agent = String::from_utf8(ua) let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
.map_err(|_| ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(
let capabilities = Capabilities::from_bits(capab) ser::Error::CorruptedData,
.ok_or(ser::Error::CorruptedData)?; )?;
match State::from_u8(fl) { match State::from_u8(fl) {
Some(flags) => { Some(flags) => {
Ok(PeerData { Ok(PeerData {
@ -109,18 +109,22 @@ impl PeerStore {
} }
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> { pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
self.db self.db.exists(
.exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) &to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..],
)
} }
pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> {
self.db self.db.delete(
.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) &to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..],
)
} }
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> { pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
let peers_iter = self.db let peers_iter = self.db.iter::<PeerData>(&to_key(
.iter::<PeerData>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); PEER_PREFIX,
&mut "".to_string().into_bytes(),
));
let mut peers = Vec::with_capacity(count); let mut peers = Vec::with_capacity(count);
for p in peers_iter { for p in peers_iter {
if p.flags == state && p.capabilities.contains(cap) { if p.flags == state && p.capabilities.contains(cap) {

View file

@ -61,10 +61,11 @@ fn peer_handshake() {
Difficulty::one(), Difficulty::one(),
my_addr, my_addr,
&p2p::handshake::Handshake::new(), &p2p::handshake::Handshake::new(),
net_adapter.clone(),
) )
}) })
.and_then(move |(socket, peer)| { .and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { rhandle.spawn(peer.run(socket).map_err(|e| {
panic!("Client run failed: {:?}", e); panic!("Client run failed: {:?}", e);
})); }));
peer.send_ping().unwrap(); peer.send_ping().unwrap();