Added height as part of handshake to inform syncing.

This commit is contained in:
Ignotus Peverell 2017-02-02 15:51:48 -08:00
parent 7f029cb4c0
commit 2d81abc16c
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
8 changed files with 42 additions and 11 deletions

View file

@ -30,6 +30,9 @@ pub struct NetToChainAdapter {
}
impl NetAdapter for NetToChainAdapter {
fn height(&self) -> u64 {
self.chain_head.lock().unwrap().height
}
fn transaction_received(&self, tx: core::Transaction) {
unimplemented!();
}

View file

@ -272,7 +272,7 @@ impl TimeoutConnection {
underlying: conn,
expected_responses: expects,
};
(me, Box::new(fut.join(timer).map(|_| ())))
(me, Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, e2)| e1)))
}
/// Sends a request and registers a timer on the provided message type and

View file

@ -46,6 +46,7 @@ impl Handshake {
/// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect(&self,
height: u64,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
// prepare the first part of the hanshake
@ -54,6 +55,7 @@ impl Handshake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
nonce: nonce,
height: height,
sender_addr: SockAddr(conn.local_addr().unwrap()),
receiver_addr: SockAddr(conn.peer_addr().unwrap()),
user_agent: USER_AGENT.to_string(),
@ -74,6 +76,7 @@ impl Handshake {
user_agent: shake.user_agent,
addr: conn.peer_addr().unwrap(),
version: shake.version,
height: shake.height,
};
info!("Connected to peer {:?}", peer_info);
@ -86,6 +89,7 @@ impl Handshake {
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake(&self,
height: u64,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonces = self.nonces.clone();
@ -113,11 +117,13 @@ impl Handshake {
user_agent: hand.user_agent,
addr: conn.peer_addr().unwrap(),
version: hand.version,
height: hand.height,
};
// send our reply with our info
let shake = Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
height: height,
user_agent: USER_AGENT.to_string(),
};
Ok((conn, shake, peer_info))

View file

@ -117,11 +117,14 @@ pub fn write_msg<T>(conn: TcpStream,
/// Header of any protocol message, used to identify incoming messages.
pub struct MsgHeader {
magic: [u8; 2],
/// Type of the message.
pub msg_type: Type,
/// Tota length of the message in bytes.
pub msg_len: u64,
}
impl MsgHeader {
/// Creates a new message header.
pub fn new(msg_type: Type, len: u64) -> MsgHeader {
MsgHeader {
magic: MAGIC,
@ -174,6 +177,8 @@ pub struct Hand {
pub capabilities: Capabilities,
/// randomly generated for each handshake, helps detect self
pub nonce: u64,
/// current height of the sender, used to check whether sync may be needed
pub height: u64,
/// network address of the sender
pub sender_addr: SockAddr,
/// network address of the receiver
@ -187,7 +192,8 @@ impl Writeable for Hand {
ser_multiwrite!(writer,
[write_u32, self.version],
[write_u32, self.capabilities.bits()],
[write_u64, self.nonce]);
[write_u64, self.nonce],
[write_u64, self.height]);
self.sender_addr.write(writer);
self.receiver_addr.write(writer);
writer.write_bytes(&self.user_agent)
@ -196,7 +202,7 @@ impl Writeable for Hand {
impl Readable<Hand> for Hand {
fn read(reader: &mut Reader) -> Result<Hand, ser::Error> {
let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64);
let (version, capab, nonce, height) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_u64);
let sender_addr = try!(SockAddr::read(reader));
let receiver_addr = try!(SockAddr::read(reader));
let ua = try!(reader.read_vec());
@ -206,6 +212,7 @@ impl Readable<Hand> for Hand {
version: version,
capabilities: capabilities,
nonce: nonce,
height: height,
sender_addr: sender_addr,
receiver_addr: receiver_addr,
user_agent: user_agent,
@ -220,6 +227,8 @@ pub struct Shake {
pub version: u32,
/// sender capabilities
pub capabilities: Capabilities,
/// current height of the sender, used to check whether sync may be needed
pub height: u64,
/// name of version of the software
pub user_agent: String,
}
@ -229,6 +238,7 @@ impl Writeable for Shake {
ser_multiwrite!(writer,
[write_u32, self.version],
[write_u32, self.capabilities.bits()],
[write_u64, self.height],
[write_bytes, &self.user_agent]);
Ok(())
}
@ -236,12 +246,13 @@ impl Writeable for Shake {
impl Readable<Shake> for Shake {
fn read(reader: &mut Reader) -> Result<Shake, ser::Error> {
let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec);
let (version, capab, height, ua) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_vec);
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(Shake {
version: version,
capabilities: capabilities,
height: height,
user_agent: user_agent,
})
}

View file

@ -33,9 +33,10 @@ unsafe impl Send for Peer {}
impl Peer {
/// Initiates the handshake with another peer.
pub fn connect(conn: TcpStream,
height: u64,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(conn).and_then(|(conn, proto, info)| {
let connect_peer = hs.connect(height, conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
@ -47,9 +48,10 @@ impl Peer {
/// Accept a handshake initiated by another peer.
pub fn accept(conn: TcpStream,
height: u64,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let hs_peer = hs.handshake(conn).and_then(|(conn, proto, info)| {
let hs_peer = hs.handshake(height, conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
@ -65,6 +67,7 @@ impl Peer {
conn: TcpStream,
na: Arc<NetAdapter>)
-> Box<Future<Item = (), Error = Error>> {
self.proto.handle(conn, na)
}

View file

@ -36,6 +36,7 @@ use types::*;
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl NetAdapter for DummyAdapter {
fn height(&self) -> u64 { 0 }
fn transaction_received(&self, tx: core::Transaction) {}
fn block_received(&self, b: core::Block) {}
}
@ -79,10 +80,11 @@ impl Server {
let hp = h.clone();
let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| {
let adapter = adapter.clone();
let height = adapter.height();
let peers = peers.clone();
// accept the peer and add it to the server map
let peer_accept = add_to_peers(peers, Peer::accept(conn, &hs.clone()));
let peer_accept = add_to_peers(peers, Peer::accept(conn, height, &hs.clone()));
// wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(peer_accept), &hp);
@ -124,18 +126,20 @@ impl Server {
h: reactor::Handle)
-> Box<Future<Item = (), Error = Error>> {
let peers = self.peers.clone();
let adapter = self.adapter.clone();
let adapter1 = self.adapter.clone();
let adapter2 = self.adapter.clone();
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e));
let request = socket.and_then(move |socket| {
let peers = peers.clone();
let height = adapter1.height();
// connect to the peer and add it to the server map, wiring it a timeout for
// the handhake
let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new()));
let peer_connect = add_to_peers(peers, Peer::connect(socket, height, &Handshake::new()));
with_timeout(Box::new(peer_connect), &h)
})
.and_then(move |(socket, peer)| peer.run(socket, adapter));
.and_then(move |(socket, peer)| peer.run(socket, adapter2));
Box::new(request)
}

View file

@ -56,6 +56,7 @@ pub struct PeerInfo {
pub user_agent: String,
pub version: u32,
pub addr: SocketAddr,
pub height: u64,
}
/// A given communication protocol agreed upon between 2 peers (usually
@ -91,6 +92,9 @@ pub trait Protocol {
/// forwarding or querying of blocks and transactions from the network among
/// other things.
pub trait NetAdapter: Sync + Send {
/// Current height of our chain.
fn height(&self) -> u64;
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction);

View file

@ -50,7 +50,7 @@ fn peer_handshake() {
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e));
socket.and_then(move |socket| {
Peer::connect(socket, &p2p::handshake::Handshake::new())
Peer::connect(socket, 0, &p2p::handshake::Handshake::new())
}).and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| {
panic!("Client run failed: {}", e);