From ea425dc6149e778f0f8f05cb488a5ac794dfd87c Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Sat, 5 Nov 2016 16:31:45 -0700 Subject: [PATCH] Protocol cleanup. Tests cleanup. Additional test for transaction broadcast. --- core/src/lib.rs | 1 - p2p/Cargo.toml | 1 + p2p/src/lib.rs | 6 +-- p2p/src/msg.rs | 2 +- p2p/src/protocol.rs | 86 +++++++++++++++++++++------------------ p2p/src/server.rs | 25 ++++++------ p2p/tests/network_conn.rs | 44 ++++++++------------ 7 files changed, 80 insertions(+), 85 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 9d234104d..aef09db2b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -35,4 +35,3 @@ pub mod core; pub mod genesis; pub mod pow; pub mod ser; -// mod chain; diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index f0062bd01..4d443a646 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -17,3 +17,4 @@ grin_core = { path = "../core" } [dev-dependencies] env_logger = "^0.3" +secp256k1zkp = { path = "../secp256k1zkp" } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 7277c59cf..43665f6e9 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -36,11 +36,11 @@ extern crate num; mod types; mod msg; -mod handshake; +pub mod handshake; mod rw; mod protocol; mod server; mod peer; -pub use server::{Server, DummyAdapter}; -pub use server::DEFAULT_LISTEN_ADDR; +pub use server::{Server, DummyAdapter, DEFAULT_LISTEN_ADDR}; +pub use peer::Peer; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 7b3116e45..39917c6b4 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -37,7 +37,7 @@ pub enum ErrCodes { /// Types of messages enum_from_primitive! { - #[derive(Clone, Copy)] + #[derive(Debug, Clone, Copy)] pub enum Type { Error, Hand, diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 95d73934d..bf0d9f517 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -66,7 +66,7 @@ impl ProtocolV1 { stop_send: RefCell::new(None), sent_bytes: Mutex::new(0), received_bytes: Mutex::new(0), - error_count: Mutex::new(0), + error_count: Mutex::new(0), } } } @@ -79,14 +79,14 @@ impl Protocol for ProtocolV1 { /// within a coroutine. fn handle(&self, adapter: &NetAdapter) -> Result<(), ser::Error> { // setup channels so we can switch between reads, writes and close - let (msg_recv, stop_recv) = self.setup_channels(); + let (msg_recv, stop_recv) = self.setup_channels(); let mut conn = self.conn.borrow_mut(); loop { // main select loop, switches between listening, sending or stopping select!( r:conn => { - let res = self.read_msg(&mut conn, adapter); + let res = self.process_msg(&mut conn, adapter); if let Err(_) = res { let mut cnt = self.error_count.lock().unwrap(); *cnt += 1; @@ -133,8 +133,8 @@ impl Protocol for ProtocolV1 { /// Bytes sent and received by this peer to the remote peer. fn transmitted_bytes(&self) -> (u64, u64) { - let sent = *self.sent_bytes.lock().unwrap().deref(); - let received = *self.received_bytes.lock().unwrap().deref(); + let sent = *self.sent_bytes.lock().unwrap().deref(); + let received = *self.received_bytes.lock().unwrap().deref(); (sent, received) } @@ -146,41 +146,46 @@ impl Protocol for ProtocolV1 { } impl ProtocolV1 { - fn read_msg(&self, mut conn: &mut TcpStream, adapter: &NetAdapter) -> Result<(), ser::Error> { - // deser the header to get the message type - let header = try!(ser::deserialize::(conn.deref_mut())); - if !header.acceptable() { - return Err(ser::Error::CorruptedData); - } + /// Reads a message from the peer tcp stream and process it, usually simply + /// forwarding to the adapter. + fn process_msg(&self, + mut conn: &mut TcpStream, + adapter: &NetAdapter) + -> Result<(), ser::Error> { + // deser the header to get the message type + let header = try!(ser::deserialize::(conn.deref_mut())); + if !header.acceptable() { + return Err(ser::Error::CorruptedData); + } - // wrap our connection with limited byte-counting readers - let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES); - let mut read_conn = rw::CountingRead::new(&mut limit_conn); + // wrap our connection with limited byte-counting readers + let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES); + let mut read_conn = rw::CountingRead::new(&mut limit_conn); - // check the message type and hopefully do what's expected with it - match header.msg_type { - Type::Ping => { - // respond with pong - try!(self.send_pong()); - }, - Type::Pong => {}, - Type::Transaction => { - let tx = try!(ser::deserialize(&mut read_conn)); - adapter.transaction_received(tx); - }, - Type::Block => { - let b = try!(ser::deserialize(&mut read_conn)); - adapter.block_received(b); - } - _ => error!("uncaught unknown"), - } + // check the message type and hopefully do what's expected with it + match header.msg_type { + Type::Ping => { + // respond with pong + try!(self.send_pong()); + } + Type::Pong => {} + Type::Transaction => { + let tx = try!(ser::deserialize(&mut read_conn)); + adapter.transaction_received(tx); + } + Type::Block => { + let b = try!(ser::deserialize(&mut read_conn)); + adapter.block_received(b); + } + _ => error!("uncaught unknown"), + } - // update total of bytes sent - let mut sent_bytes = self.sent_bytes.lock().unwrap(); - *sent_bytes += header.serialized_len() + (read_conn.bytes_read() as u64); + // update total of bytes sent + let mut received_bytes = self.received_bytes.lock().unwrap(); + *received_bytes += header.serialized_len() + (read_conn.bytes_read() as u64); - Ok(()) - } + Ok(()) + } /// Helper function to avoid boilerplate, builds a header followed by the /// Writeable body and send the whole thing. @@ -194,6 +199,7 @@ impl ProtocolV1 { Ok(()) } + /// Sends a pong message (usually in reply to ping) fn send_pong(&self) -> Result<(), ser::Error> { let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong))); let msg_send = self.msg_send.borrow(); @@ -201,8 +207,8 @@ impl ProtocolV1 { Ok(()) } - /// Setup internal communication channels to select over - fn setup_channels(&self) -> (Receiver>, Receiver) { + /// Setup internal communication channels to select over + fn setup_channels(&self) -> (Receiver>, Receiver) { let (msg_send, msg_recv) = sync_channel(10); let (stop_send, stop_recv) = sync_channel(1); { @@ -211,6 +217,6 @@ impl ProtocolV1 { let mut stop_mut = self.stop_send.borrow_mut(); *stop_mut = Some(stop_send); } - (msg_recv, stop_recv) - } + (msg_recv, stop_recv) + } } diff --git a/p2p/src/server.rs b/p2p/src/server.rs index f542e4e0b..0cce6acaa 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -15,6 +15,7 @@ //! Grin server implementation, accepts incoming connections and connects to //! other peers in the network. +use rand::{self, Rng}; use std::cell::RefCell; use std::io; use std::net::SocketAddr; @@ -91,7 +92,6 @@ impl Server { { let mut peers = self.peers.write().unwrap(); peers.push(wpeer.clone()); - println!("len {}", peers.len()) } mioco::spawn(move || -> io::Result<()> { @@ -131,6 +131,18 @@ impl Server { Ok(()) } + /// Number of peers this server is connected to. + pub fn peers_count(&self) -> u32 { + self.peers.read().unwrap().len() as u32 + } + + /// Gets a random peer from our set of connected peers. + pub fn get_any_peer(&self) -> Arc { + let mut rng = rand::thread_rng(); + let peers = self.peers.read().unwrap(); + peers[rng.gen_range(0, peers.len())].clone() + } + /// Stops the server. Disconnect from all peers at the same time. pub fn stop(&self) { let peers = self.peers.write().unwrap(); @@ -140,15 +152,4 @@ impl Server { let stop_send = self.stop_send.borrow(); stop_send.as_ref().unwrap().send(0); } - - /// Simulates an unrelated client connecting to our server. Mostly used for - /// tests. - pub fn connect_as_client(addr: SocketAddr) -> Result { - let tcp_client = TcpStream::connect(&addr).unwrap(); - Peer::accept(tcp_client, &Handshake::new()) - } - - pub fn peers_count(&self) -> u32 { - self.peers.read().unwrap().len() as u32 - } } diff --git a/p2p/tests/network_conn.rs b/p2p/tests/network_conn.rs index 5006aaaa4..7f67f2e4b 100644 --- a/p2p/tests/network_conn.rs +++ b/p2p/tests/network_conn.rs @@ -12,53 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. +extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate mioco; extern crate env_logger; +mod common; + use std::io; -use std::sync::Arc; use std::time; +use core::core::*; +use p2p::Peer; +use common::*; + +// Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live. #[test] fn peer_handshake() { env_logger::init().unwrap(); - mioco::start(|| -> io::Result<()> { - // start a server in its own coroutine - let server = Arc::new(p2p::Server::new()); - let in_server = server.clone(); - mioco::spawn(move || -> io::Result<()> { - try!(in_server.start().map_err(|_| io::Error::last_os_error())); - Ok(()) - }); - - // giving server a little time to start - mioco::sleep(time::Duration::from_millis(50)); - + with_server(|server| -> io::Result<()> { // connect a client peer to the server - let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap(); - let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error())); - mioco::sleep(time::Duration::from_millis(50)); - assert_eq!(server.peers_count(), 1); + let peer = try!(connect_peer()); - // spawn our client peer to its own coroutine so it can poll for replies - let peer = Arc::new(peer); - let in_peer = peer.clone(); - mioco::spawn(move || -> io::Result<()> { - in_peer.run(&p2p::DummyAdapter{}); - Ok(()) - }); - mioco::sleep(time::Duration::from_millis(50)); + // check server peer count + let pc = server.peers_count(); + assert_eq!(pc, 1); - // send a ping and check we got ponged + // send a ping and check we got ponged (received data back) peer.send_ping(); mioco::sleep(time::Duration::from_millis(50)); let (sent, recv) = peer.transmitted_bytes(); assert!(sent > 0); assert!(recv > 0); - server.stop(); + peer.stop(); Ok(()) - }).unwrap().unwrap(); + }); }