diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 295e10424..e07dd5714 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -30,6 +30,9 @@ pub struct NetToChainAdapter { } impl NetAdapter for NetToChainAdapter { + fn height(&self) -> u64 { + self.chain_head.lock().unwrap().height + } fn transaction_received(&self, tx: core::Transaction) { unimplemented!(); } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 0ff95a01a..126be5fd5 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -272,7 +272,7 @@ impl TimeoutConnection { underlying: conn, expected_responses: expects, }; - (me, Box::new(fut.join(timer).map(|_| ()))) + (me, Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, e2)| e1))) } /// Sends a request and registers a timer on the provided message type and diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index e644c95ca..884e0ee7e 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -46,6 +46,7 @@ impl Handshake { /// Handles connecting to a new remote peer, starting the version handshake. pub fn connect(&self, + height: u64, conn: TcpStream) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> { // prepare the first part of the hanshake @@ -54,6 +55,7 @@ impl Handshake { version: PROTOCOL_VERSION, capabilities: FULL_SYNC, nonce: nonce, + height: height, sender_addr: SockAddr(conn.local_addr().unwrap()), receiver_addr: SockAddr(conn.peer_addr().unwrap()), user_agent: USER_AGENT.to_string(), @@ -74,6 +76,7 @@ impl Handshake { user_agent: shake.user_agent, addr: conn.peer_addr().unwrap(), version: shake.version, + height: shake.height, }; info!("Connected to peer {:?}", peer_info); @@ -86,6 +89,7 @@ impl Handshake { /// Handles receiving a connection from a new remote peer that started the /// version handshake. pub fn handshake(&self, + height: u64, conn: TcpStream) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> { let nonces = self.nonces.clone(); @@ -113,11 +117,13 @@ impl Handshake { user_agent: hand.user_agent, addr: conn.peer_addr().unwrap(), version: hand.version, + height: hand.height, }; // send our reply with our info let shake = Shake { version: PROTOCOL_VERSION, capabilities: FULL_SYNC, + height: height, user_agent: USER_AGENT.to_string(), }; Ok((conn, shake, peer_info)) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 094898af6..2e42554aa 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -117,11 +117,14 @@ pub fn write_msg<T>(conn: TcpStream, /// Header of any protocol message, used to identify incoming messages. pub struct MsgHeader { magic: [u8; 2], + /// Type of the message. pub msg_type: Type, + /// Tota length of the message in bytes. pub msg_len: u64, } impl MsgHeader { + /// Creates a new message header. pub fn new(msg_type: Type, len: u64) -> MsgHeader { MsgHeader { magic: MAGIC, @@ -174,6 +177,8 @@ pub struct Hand { pub capabilities: Capabilities, /// randomly generated for each handshake, helps detect self pub nonce: u64, + /// current height of the sender, used to check whether sync may be needed + pub height: u64, /// network address of the sender pub sender_addr: SockAddr, /// network address of the receiver @@ -187,7 +192,8 @@ impl Writeable for Hand { ser_multiwrite!(writer, [write_u32, self.version], [write_u32, self.capabilities.bits()], - [write_u64, self.nonce]); + [write_u64, self.nonce], + [write_u64, self.height]); self.sender_addr.write(writer); self.receiver_addr.write(writer); writer.write_bytes(&self.user_agent) @@ -196,7 +202,7 @@ impl Writeable for Hand { impl Readable<Hand> for Hand { fn read(reader: &mut Reader) -> Result<Hand, ser::Error> { - let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64); + let (version, capab, nonce, height) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_u64); let sender_addr = try!(SockAddr::read(reader)); let receiver_addr = try!(SockAddr::read(reader)); let ua = try!(reader.read_vec()); @@ -206,6 +212,7 @@ impl Readable<Hand> for Hand { version: version, capabilities: capabilities, nonce: nonce, + height: height, sender_addr: sender_addr, receiver_addr: receiver_addr, user_agent: user_agent, @@ -220,6 +227,8 @@ pub struct Shake { pub version: u32, /// sender capabilities pub capabilities: Capabilities, + /// current height of the sender, used to check whether sync may be needed + pub height: u64, /// name of version of the software pub user_agent: String, } @@ -229,6 +238,7 @@ impl Writeable for Shake { ser_multiwrite!(writer, [write_u32, self.version], [write_u32, self.capabilities.bits()], + [write_u64, self.height], [write_bytes, &self.user_agent]); Ok(()) } @@ -236,12 +246,13 @@ impl Writeable for Shake { impl Readable<Shake> for Shake { fn read(reader: &mut Reader) -> Result<Shake, ser::Error> { - let (version, capab, ua) = ser_multiread!(reader, read_u32, read_u32, read_vec); + let (version, capab, height, ua) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_vec); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)); Ok(Shake { version: version, capabilities: capabilities, + height: height, user_agent: user_agent, }) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 73fe49184..ee5811562 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -33,9 +33,10 @@ unsafe impl Send for Peer {} impl Peer { /// Initiates the handshake with another peer. pub fn connect(conn: TcpStream, + height: u64, hs: &Handshake) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> { - let connect_peer = hs.connect(conn).and_then(|(conn, proto, info)| { + let connect_peer = hs.connect(height, conn).and_then(|(conn, proto, info)| { Ok((conn, Peer { info: info, @@ -47,9 +48,10 @@ impl Peer { /// Accept a handshake initiated by another peer. pub fn accept(conn: TcpStream, + height: u64, hs: &Handshake) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> { - let hs_peer = hs.handshake(conn).and_then(|(conn, proto, info)| { + let hs_peer = hs.handshake(height, conn).and_then(|(conn, proto, info)| { Ok((conn, Peer { info: info, @@ -65,6 +67,7 @@ impl Peer { conn: TcpStream, na: Arc<NetAdapter>) -> Box<Future<Item = (), Error = Error>> { + self.proto.handle(conn, na) } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index 19784da5e..8d839524f 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -36,6 +36,7 @@ use types::*; /// A no-op network adapter used for testing. pub struct DummyAdapter {} impl NetAdapter for DummyAdapter { + fn height(&self) -> u64 { 0 } fn transaction_received(&self, tx: core::Transaction) {} fn block_received(&self, b: core::Block) {} } @@ -79,10 +80,11 @@ impl Server { let hp = h.clone(); let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| { let adapter = adapter.clone(); + let height = adapter.height(); let peers = peers.clone(); // accept the peer and add it to the server map - let peer_accept = add_to_peers(peers, Peer::accept(conn, &hs.clone())); + let peer_accept = add_to_peers(peers, Peer::accept(conn, height, &hs.clone())); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(peer_accept), &hp); @@ -124,18 +126,20 @@ impl Server { h: reactor::Handle) -> Box<Future<Item = (), Error = Error>> { let peers = self.peers.clone(); - let adapter = self.adapter.clone(); + let adapter1 = self.adapter.clone(); + let adapter2 = self.adapter.clone(); let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); let request = socket.and_then(move |socket| { let peers = peers.clone(); + let height = adapter1.height(); // connect to the peer and add it to the server map, wiring it a timeout for // the handhake - let peer_connect = add_to_peers(peers, Peer::connect(socket, &Handshake::new())); + let peer_connect = add_to_peers(peers, Peer::connect(socket, height, &Handshake::new())); with_timeout(Box::new(peer_connect), &h) }) - .and_then(move |(socket, peer)| peer.run(socket, adapter)); + .and_then(move |(socket, peer)| peer.run(socket, adapter2)); Box::new(request) } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 773933d8c..2f4c1b266 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -56,6 +56,7 @@ pub struct PeerInfo { pub user_agent: String, pub version: u32, pub addr: SocketAddr, + pub height: u64, } /// A given communication protocol agreed upon between 2 peers (usually @@ -91,6 +92,9 @@ pub trait Protocol { /// forwarding or querying of blocks and transactions from the network among /// other things. pub trait NetAdapter: Sync + Send { + /// Current height of our chain. + fn height(&self) -> u64; + /// A valid transaction has been received from one of our peers fn transaction_received(&self, tx: core::Transaction); diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 8cb0df1d2..f4ea14c1a 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -50,7 +50,7 @@ fn peer_handshake() { let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port); let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e)); socket.and_then(move |socket| { - Peer::connect(socket, &p2p::handshake::Handshake::new()) + Peer::connect(socket, 0, &p2p::handshake::Handshake::new()) }).and_then(move |(socket, peer)| { rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| { panic!("Client run failed: {}", e);