Protocol cleanup. Tests cleanup. Additional test for transaction broadcast.

This commit is contained in:
Ignotus Peverell 2016-11-05 16:31:45 -07:00
parent 28f007240e
commit ea425dc614
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
7 changed files with 80 additions and 85 deletions

View file

@ -35,4 +35,3 @@ pub mod core;
pub mod genesis;
pub mod pow;
pub mod ser;
// mod chain;

View file

@ -17,3 +17,4 @@ grin_core = { path = "../core" }
[dev-dependencies]
env_logger = "^0.3"
secp256k1zkp = { path = "../secp256k1zkp" }

View file

@ -36,11 +36,11 @@ extern crate num;
mod types;
mod msg;
mod handshake;
pub mod handshake;
mod rw;
mod protocol;
mod server;
mod peer;
pub use server::{Server, DummyAdapter};
pub use server::DEFAULT_LISTEN_ADDR;
pub use server::{Server, DummyAdapter, DEFAULT_LISTEN_ADDR};
pub use peer::Peer;

View file

@ -37,7 +37,7 @@ pub enum ErrCodes {
/// Types of messages
enum_from_primitive! {
#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy)]
pub enum Type {
Error,
Hand,

View file

@ -66,7 +66,7 @@ impl ProtocolV1 {
stop_send: RefCell::new(None),
sent_bytes: Mutex::new(0),
received_bytes: Mutex::new(0),
error_count: Mutex::new(0),
error_count: Mutex::new(0),
}
}
}
@ -79,14 +79,14 @@ impl Protocol for ProtocolV1 {
/// within a coroutine.
fn handle(&self, adapter: &NetAdapter) -> Result<(), ser::Error> {
// setup channels so we can switch between reads, writes and close
let (msg_recv, stop_recv) = self.setup_channels();
let (msg_recv, stop_recv) = self.setup_channels();
let mut conn = self.conn.borrow_mut();
loop {
// main select loop, switches between listening, sending or stopping
select!(
r:conn => {
let res = self.read_msg(&mut conn, adapter);
let res = self.process_msg(&mut conn, adapter);
if let Err(_) = res {
let mut cnt = self.error_count.lock().unwrap();
*cnt += 1;
@ -133,8 +133,8 @@ impl Protocol for ProtocolV1 {
/// Bytes sent and received by this peer to the remote peer.
fn transmitted_bytes(&self) -> (u64, u64) {
let sent = *self.sent_bytes.lock().unwrap().deref();
let received = *self.received_bytes.lock().unwrap().deref();
let sent = *self.sent_bytes.lock().unwrap().deref();
let received = *self.received_bytes.lock().unwrap().deref();
(sent, received)
}
@ -146,41 +146,46 @@ impl Protocol for ProtocolV1 {
}
impl ProtocolV1 {
fn read_msg(&self, mut conn: &mut TcpStream, adapter: &NetAdapter) -> Result<(), ser::Error> {
// deser the header to get the message type
let header = try!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() {
return Err(ser::Error::CorruptedData);
}
/// Reads a message from the peer tcp stream and process it, usually simply
/// forwarding to the adapter.
fn process_msg(&self,
mut conn: &mut TcpStream,
adapter: &NetAdapter)
-> Result<(), ser::Error> {
// deser the header to get the message type
let header = try!(ser::deserialize::<MsgHeader>(conn.deref_mut()));
if !header.acceptable() {
return Err(ser::Error::CorruptedData);
}
// wrap our connection with limited byte-counting readers
let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES);
let mut read_conn = rw::CountingRead::new(&mut limit_conn);
// wrap our connection with limited byte-counting readers
let mut limit_conn = rw::LimitedRead::new(conn.deref_mut(), MAX_DATA_BYTES);
let mut read_conn = rw::CountingRead::new(&mut limit_conn);
// check the message type and hopefully do what's expected with it
match header.msg_type {
Type::Ping => {
// respond with pong
try!(self.send_pong());
},
Type::Pong => {},
Type::Transaction => {
let tx = try!(ser::deserialize(&mut read_conn));
adapter.transaction_received(tx);
},
Type::Block => {
let b = try!(ser::deserialize(&mut read_conn));
adapter.block_received(b);
}
_ => error!("uncaught unknown"),
}
// check the message type and hopefully do what's expected with it
match header.msg_type {
Type::Ping => {
// respond with pong
try!(self.send_pong());
}
Type::Pong => {}
Type::Transaction => {
let tx = try!(ser::deserialize(&mut read_conn));
adapter.transaction_received(tx);
}
Type::Block => {
let b = try!(ser::deserialize(&mut read_conn));
adapter.block_received(b);
}
_ => error!("uncaught unknown"),
}
// update total of bytes sent
let mut sent_bytes = self.sent_bytes.lock().unwrap();
*sent_bytes += header.serialized_len() + (read_conn.bytes_read() as u64);
// update total of bytes sent
let mut received_bytes = self.received_bytes.lock().unwrap();
*received_bytes += header.serialized_len() + (read_conn.bytes_read() as u64);
Ok(())
}
Ok(())
}
/// Helper function to avoid boilerplate, builds a header followed by the
/// Writeable body and send the whole thing.
@ -194,6 +199,7 @@ impl ProtocolV1 {
Ok(())
}
/// Sends a pong message (usually in reply to ping)
fn send_pong(&self) -> Result<(), ser::Error> {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong)));
let msg_send = self.msg_send.borrow();
@ -201,8 +207,8 @@ impl ProtocolV1 {
Ok(())
}
/// Setup internal communication channels to select over
fn setup_channels(&self) -> (Receiver<Vec<u8>>, Receiver<u8>) {
/// Setup internal communication channels to select over
fn setup_channels(&self) -> (Receiver<Vec<u8>>, Receiver<u8>) {
let (msg_send, msg_recv) = sync_channel(10);
let (stop_send, stop_recv) = sync_channel(1);
{
@ -211,6 +217,6 @@ impl ProtocolV1 {
let mut stop_mut = self.stop_send.borrow_mut();
*stop_mut = Some(stop_send);
}
(msg_recv, stop_recv)
}
(msg_recv, stop_recv)
}
}

View file

@ -15,6 +15,7 @@
//! Grin server implementation, accepts incoming connections and connects to
//! other peers in the network.
use rand::{self, Rng};
use std::cell::RefCell;
use std::io;
use std::net::SocketAddr;
@ -91,7 +92,6 @@ impl Server {
{
let mut peers = self.peers.write().unwrap();
peers.push(wpeer.clone());
println!("len {}", peers.len())
}
mioco::spawn(move || -> io::Result<()> {
@ -131,6 +131,18 @@ impl Server {
Ok(())
}
/// Number of peers this server is connected to.
pub fn peers_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32
}
/// Gets a random peer from our set of connected peers.
pub fn get_any_peer(&self) -> Arc<Peer> {
let mut rng = rand::thread_rng();
let peers = self.peers.read().unwrap();
peers[rng.gen_range(0, peers.len())].clone()
}
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(&self) {
let peers = self.peers.write().unwrap();
@ -140,15 +152,4 @@ impl Server {
let stop_send = self.stop_send.borrow();
stop_send.as_ref().unwrap().send(0);
}
/// Simulates an unrelated client connecting to our server. Mostly used for
/// tests.
pub fn connect_as_client(addr: SocketAddr) -> Result<Peer, Error> {
let tcp_client = TcpStream::connect(&addr).unwrap();
Peer::accept(tcp_client, &Handshake::new())
}
pub fn peers_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32
}
}

View file

@ -12,53 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate mioco;
extern crate env_logger;
mod common;
use std::io;
use std::sync::Arc;
use std::time;
use core::core::*;
use p2p::Peer;
use common::*;
// Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live.
#[test]
fn peer_handshake() {
env_logger::init().unwrap();
mioco::start(|| -> io::Result<()> {
// start a server in its own coroutine
let server = Arc::new(p2p::Server::new());
let in_server = server.clone();
mioco::spawn(move || -> io::Result<()> {
try!(in_server.start().map_err(|_| io::Error::last_os_error()));
Ok(())
});
// giving server a little time to start
mioco::sleep(time::Duration::from_millis(50));
with_server(|server| -> io::Result<()> {
// connect a client peer to the server
let addr = p2p::DEFAULT_LISTEN_ADDR.parse().unwrap();
let peer = try!(p2p::Server::connect_as_client(addr).map_err(|_| io::Error::last_os_error()));
mioco::sleep(time::Duration::from_millis(50));
assert_eq!(server.peers_count(), 1);
let peer = try!(connect_peer());
// spawn our client peer to its own coroutine so it can poll for replies
let peer = Arc::new(peer);
let in_peer = peer.clone();
mioco::spawn(move || -> io::Result<()> {
in_peer.run(&p2p::DummyAdapter{});
Ok(())
});
mioco::sleep(time::Duration::from_millis(50));
// check server peer count
let pc = server.peers_count();
assert_eq!(pc, 1);
// send a ping and check we got ponged
// send a ping and check we got ponged (received data back)
peer.send_ping();
mioco::sleep(time::Duration::from_millis(50));
let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0);
assert!(recv > 0);
server.stop();
peer.stop();
Ok(())
}).unwrap().unwrap();
});
}