[WIP] Rewrite peer-to-peer logic to replace tokio with simple threading ()

This commit is contained in:
Ignotus Peverell 2018-02-02 02:03:12 +00:00 committed by GitHub
parent 1ad6505130
commit c7418cfe04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1019 additions and 2027 deletions

View file

@ -16,16 +16,12 @@ grin_keychain = { path = "../keychain" }
grin_wallet = { path = "../wallet" }
grin_pow = { path = "../pow" }
futures = "^0.1.15"
futures-cpupool = "^0.1.3"
hyper = "~0.11.4"
hyper = "~0.10.6"
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
time = "^0.1"
serde = "~1.0.8"
serde_derive = "~1.0.8"
serde_json = "~1.0.7"
tokio-core="~0.1.1"
tokio-timer="~0.1.0"
rand = "^0.3"
router = "~0.5.1"
itertools = "~0.6.0"

View file

@ -21,8 +21,6 @@
#![deny(unused_mut)]
#![warn(missing_docs)]
extern crate futures;
extern crate futures_cpupool as cpupool;
extern crate hyper;
extern crate itertools;
extern crate rand;
@ -33,8 +31,6 @@ extern crate serde_json;
#[macro_use]
extern crate slog;
extern crate time;
extern crate tokio_core;
extern crate tokio_timer;
extern crate grin_api as api;
extern crate grin_chain as chain;

View file

@ -16,313 +16,219 @@
//! a mining worker implementation
//!
use std::io::Read;
use std::net::SocketAddr;
use std::str::{self, FromStr};
use std::sync::Arc;
use std::str;
use std::sync::{Arc, mpsc};
use std::time;
use cpupool;
use futures::{self, future, Future, Stream};
use futures::sync::mpsc;
use hyper;
use tokio_core::reactor;
use tokio_timer::Timer;
use std::thread;
use time::now_utc;
use hyper;
use p2p;
use util::LOGGER;
const BAN_WINDOW: i64 = 10800;
const PEER_MAX_COUNT: u32 = 25;
const PEER_PREFERRED_COUNT: u32 = 8;
const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt";
pub struct Seeder {
peers: p2p::Peers,
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
capabilities: p2p::Capabilities,
seed_list: Box<Fn() -> Vec<SocketAddr> + Send>,
) {
let _ = thread::Builder::new()
.name("seed".to_string())
.spawn(move || {
let peers = p2p_server.peers.clone();
// open a channel with a listener that connects every peer address sent below
// max peer count
let (tx, rx) = mpsc::channel();
// check seeds first
connect_to_seeds(peers.clone(), tx.clone(), seed_list);
loop {
// try to connect to any address sent to the channel
listen_for_addrs(peers.clone(), p2p_server.clone(), capabilities, &rx);
// monitor additional peers if we need to add more
monitor_peers(peers.clone(), capabilities, tx.clone());
thread::sleep(time::Duration::from_secs(20));
}
});
}
impl Seeder {
pub fn new(
capabilities: p2p::Capabilities,
p2p_server: Arc<p2p::Server>,
peers: p2p::Peers,
) -> Seeder {
Seeder {
peers: peers,
p2p_server: p2p_server,
capabilities: capabilities,
fn monitor_peers(
peers: p2p::Peers,
capabilities: p2p::Capabilities,
tx: mpsc::Sender<SocketAddr>,
) {
// regularly check if we need to acquire more peers and if so, gets
// them from db
let total_count = peers.all_peers().len();
debug!(
LOGGER,
"monitor_peers: {} most_work_peers, {} connected, {} total known",
peers.most_work_peers().len(),
peers.connected_peers().len(),
total_count,
);
let mut healthy_count = 0;
let mut banned_count = 0;
let mut defunct_count = 0;
for x in peers.all_peers() {
match x.flags {
p2p::State::Banned => {
let interval = now_utc().to_timespec().sec - x.last_banned;
// Unban peer
if interval >= BAN_WINDOW {
peers.unban_peer(&x.addr);
debug!(
LOGGER,
"monitor_peers: unbanned {} after {} seconds", x.addr, interval
);
} else {
banned_count += 1;
}
}
p2p::State::Healthy => healthy_count += 1,
p2p::State::Defunct => defunct_count += 1,
}
}
pub fn connect_and_monitor(
&self,
h: reactor::Handle,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
) {
// open a channel with a listener that connects every peer address sent below
// max peer count
let (tx, rx) = futures::sync::mpsc::unbounded();
h.spawn(self.listen_for_addrs(h.clone(), rx));
debug!(
LOGGER,
"monitor_peers: all {} = {} healthy + {} banned + {} defunct",
total_count,
healthy_count,
banned_count,
defunct_count,
);
// check seeds and start monitoring connections
let seeder = self.connect_to_seeds(tx.clone(), seed_list)
.join(self.monitor_peers(tx.clone()));
// maintenance step first, clean up p2p server peers
peers.clean_peers(PEER_MAX_COUNT as usize);
h.spawn(seeder.map(|_| ()).map_err(|e| {
error!(LOGGER, "Seeding or peer monitoring error: {}", e);
()
}));
// not enough peers, getting more from db
if peers.peer_count() >= PEER_PREFERRED_COUNT {
return;
}
fn monitor_peers(
&self,
tx: mpsc::UnboundedSender<SocketAddr>,
) -> Box<Future<Item = (), Error = String>> {
let peers = self.peers.clone();
let capabilities = self.capabilities.clone();
// loop over connected peers
// ask them for their list of peers
for p in peers.connected_peers() {
if let Ok(p) = p.try_read() {
debug!(LOGGER, "monitor_peers: ask {} for more peers", p.info.addr);
let _ = p.send_peer_request(capabilities);
} else {
warn!(LOGGER, "monitor_peers: failed to get read lock on peer");
}
}
// Unban peer after 3 hours
let ban_windows: i64 = 10800;
// find some peers from our db
// and queue them up for a connection attempt
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::UNKNOWN, 100);
for p in peers {
debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr);
tx.send(p.addr).unwrap();
}
}
// now spawn a new future to regularly check if we need to acquire more peers
// and if so, gets them from db and unban the banned peers after the ban is
// expired
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(30))
.for_each(move |_| {
let total_count = peers.all_peers().len();
debug!(
LOGGER,
"monitor_peers: {} most_work_peers, {} connected, {} total known",
peers.most_work_peers().len(),
peers.connected_peers().len(),
total_count,
);
// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
fn connect_to_seeds(
peers: p2p::Peers,
tx: mpsc::Sender<SocketAddr>,
seed_list: Box<Fn() -> Vec<SocketAddr>>,
) {
let mut healthy_count = 0;
let mut banned_count = 0;
let mut defunct_count = 0;
for x in peers.all_peers() {
if x.flags == p2p::State::Healthy {
healthy_count += 1
} else if x.flags == p2p::State::Banned {
let interval = now_utc().to_timespec().sec - x.last_banned;
if interval >= ban_windows {
// Unban peer
peers.unban_peer(&x.addr);
debug!(
LOGGER,
"monitor_peers: unbanned {} after {} seconds", x.addr, interval
);
} else {
banned_count += 1;
}
} else if x.flags == p2p::State::Defunct {
defunct_count += 1
};
}
// check if we have some peers in db
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::FULL_HIST, 100);
debug!(
LOGGER,
"monitor_peers: all {} = {} healthy + {} banned + {} defunct",
total_count,
healthy_count,
banned_count,
defunct_count,
);
// if so, get their addresses, otherwise use our seeds
let peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list()
};
// maintenance step first, clean up p2p server peers
{
peers.clean_peers(PEER_MAX_COUNT as usize);
}
if peer_addrs.len() == 0 {
warn!(LOGGER, "No seeds were retrieved.");
}
// not enough peers, getting more from db
if peers.peer_count() < PEER_PREFERRED_COUNT {
// loop over connected peers
// ask them for their list of peers
for p in peers.connected_peers() {
if let Ok(p) = p.try_read() {
debug!(
LOGGER,
"monitor_peers: asking {} for more peers", p.info.addr,
);
let _ = p.send_peer_request(capabilities);
} else {
warn!(LOGGER, "monitor_peers: failed to get read lock on peer",);
}
// connect to this first set of addresses
for addr in peer_addrs {
tx.send(addr).unwrap();
}
}
/// Regularly poll a channel receiver for new addresses and initiate a
/// connection if the max peer count isn't exceeded. A request for more
/// peers is also automatically sent after connection.
fn listen_for_addrs(
peers: p2p::Peers,
p2p: Arc<p2p::Server>,
capab: p2p::Capabilities,
rx: &mpsc::Receiver<SocketAddr>,
) {
let pc = peers.peer_count();
for addr in rx.try_iter() {
if pc < PEER_MAX_COUNT {
let connect_peer = p2p.connect(&addr);
match connect_peer {
Ok(p) => {
debug!(LOGGER, "connect_and_req: ok. attempting send_peer_request");
if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab);
}
// find some peers from our db
// and queue them up for a connection attempt
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::UNKNOWN, 100);
for p in peers {
debug!(LOGGER, "monitor_peers: queue to soon try {}", p.addr,);
tx.unbounded_send(p.addr).unwrap();
}
}
Ok(())
})
.map_err(|e| e.to_string());
Box::new(mon_loop)
}
// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
fn connect_to_seeds(
&self,
tx: mpsc::UnboundedSender<SocketAddr>,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
) -> Box<Future<Item = (), Error = String>> {
// a thread pool is required so we don't block the event loop with a
// db query
let thread_pool = cpupool::Builder::new()
.pool_size(1)
.name_prefix("seed")
.create();
let peers = self.peers.clone();
let seeder = thread_pool
.spawn_fn(move || {
// check if we have some peers in db
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::FULL_HIST, 100);
Ok(peers)
})
.and_then(|peers| {
// if so, get their addresses, otherwise use our seeds
if peers.len() > 3 {
Box::new(future::ok(peers.iter().map(|p| p.addr).collect::<Vec<_>>()))
} else {
seed_list
}
})
.and_then(move |peer_addrs| {
if peer_addrs.len() == 0 {
warn!(LOGGER, "No seeds were retrieved.");
}
// connect to this first set of addresses
for addr in peer_addrs {
tx.unbounded_send(addr).unwrap();
}
Ok(())
});
Box::new(seeder)
}
/// Builds a future to continuously listen on a channel receiver for new
/// addresses to and initiate a connection if the max peer count isn't
/// exceeded. A request for more peers is also automatically sent after
/// connection.
fn listen_for_addrs(
&self,
h: reactor::Handle,
rx: mpsc::UnboundedReceiver<SocketAddr>,
) -> Box<Future<Item = (), Error = ()>> {
let capab = self.capabilities;
let peers = self.peers.clone();
let p2p_server = self.p2p_server.clone();
let listener = rx.for_each(move |peer_addr| {
debug!(LOGGER, "New peer address to connect to: {}.", peer_addr);
let inner_h = h.clone();
if peers.peer_count() < PEER_MAX_COUNT {
h.spawn(connect_and_req(
capab,
p2p_server.clone(),
peers.clone(),
inner_h,
peer_addr,
))
},
Err(e) => {
debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e);
let _ = peers.update_state(addr, p2p::State::Defunct);
},
}
Box::new(future::ok(()))
});
Box::new(listener)
}
}
}
/// Extract the list of seeds from a pre-defined text file available through
/// http. Easy method until we have a set of DNS names we can rely on.
pub fn web_seeds(h: reactor::Handle) -> Box<Future<Item = Vec<SocketAddr>, Error = String>> {
let url = hyper::Uri::from_str(&SEEDS_URL).unwrap();
let seeds = future::ok(()).and_then(move |_| {
let client = hyper::Client::new(&h);
pub fn web_seeds() -> Box<Fn() -> Vec<SocketAddr> + Send> {
Box::new(|| {
let client = hyper::Client::new();
debug!(LOGGER, "Retrieving seed nodes from {}", &SEEDS_URL);
// http get, filtering out non 200 results
client
.get(url)
.map_err(|e| e.to_string())
.and_then(|res| {
if res.status() != hyper::Ok {
return Err(format!("Gist request failed: {}", res.status()));
}
Ok(res)
})
.and_then(|res| {
// collect all chunks and split around whitespace to get a list of SocketAddr
res.body()
.collect()
.map_err(|e| e.to_string())
.and_then(|chunks| {
let res = chunks.iter().fold("".to_string(), |acc, ref chunk| {
acc + str::from_utf8(&chunk[..]).unwrap()
});
let addrs = res.split_whitespace()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>();
debug!(LOGGER, "Retrieved seed addresses: {:?}", addrs);
Ok(addrs)
})
})
});
Box::new(seeds)
let mut res = client.get(SEEDS_URL).send().expect("Failed to resolve seeds.");
if res.status != hyper::Ok {
panic!("Failed to resolve seeds, got status {}.", res.status);
}
let mut buf = vec![];
res.read_to_end(&mut buf).expect("Could not read seed list.");
let text = str::from_utf8(&buf[..]).expect("Corrupted seed list.");
let addrs = text.split_whitespace()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>();
debug!(LOGGER, "Retrieved seed addresses: {:?}", addrs);
addrs
})
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn predefined_seeds(
addrs_str: Vec<String>,
) -> Box<Future<Item = Vec<SocketAddr>, Error = String>> {
let seeds = future::ok(()).and_then(move |_| {
Ok(addrs_str
pub fn predefined_seeds(addrs_str: Vec<String>) -> Box<Fn() -> Vec<SocketAddr> + Send> {
Box::new(move || {
addrs_str
.iter()
.map(|s| s.parse().unwrap())
.collect::<Vec<_>>())
});
Box::new(seeds)
}
fn connect_and_req(
capab: p2p::Capabilities,
p2p: Arc<p2p::Server>,
peers: p2p::Peers,
h: reactor::Handle,
addr: SocketAddr,
) -> Box<Future<Item = (), Error = ()>> {
let connect_peer = p2p.connect_peer(addr, h);
let peers = peers.clone();
let fut = connect_peer.then(move |p| {
match p {
Ok(Some(p)) => {
debug!(LOGGER, "connect_and_req: ok. attempting send_peer_request");
if let Ok(p) = p.try_read() {
let _ = p.send_peer_request(capab);
}
}
Ok(None) => {
debug!(
LOGGER,
"connect_and_req: ok but none inner (what does this mean?), {}", addr
);
}
Err(e) => {
debug!(LOGGER, "connect_and_req: {} is Defunct; {:?}", addr, e);
let _ = peers.update_state(addr, p2p::State::Defunct);
}
}
Ok(())
});
Box::new(fut)
.collect::<Vec<_>>()
})
}

View file

@ -22,11 +22,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time;
use futures::{Future, Stream};
use cpupool::CpuPool;
use tokio_core::reactor;
use tokio_timer::Timer;
use adapters::*;
use api;
use chain;
@ -40,13 +35,10 @@ use types::*;
use pow;
use util::LOGGER;
/// Grin server holding internal structures.
pub struct Server {
/// server config
pub config: ServerConfig,
/// event handle
evt_handle: reactor::Handle,
/// handle to our network server
p2p: Arc<p2p::Server>,
/// data store access
@ -59,28 +51,19 @@ pub struct Server {
impl Server {
/// Instantiates and starts a new server.
pub fn start(config: ServerConfig) -> Result<Server, Error> {
let mut evtlp = reactor::Core::new().unwrap();
let mut mining_config = config.mining_config.clone();
let serv = Server::future(config, &evtlp.handle())?;
let serv = Server::new(config)?;
if mining_config.as_mut().unwrap().enable_mining {
serv.start_miner(mining_config.unwrap());
}
let forever = Timer::default()
.interval(time::Duration::from_secs(60))
.for_each(move |_| {
debug!(LOGGER, "event loop running");
Ok(())
})
.map_err(|_| ());
evtlp.run(forever).unwrap();
Ok(serv)
loop {
thread::sleep(time::Duration::from_secs(10));
}
}
/// Instantiates a new server associated with the provided future reactor.
pub fn future(mut config: ServerConfig, evt_handle: &reactor::Handle) -> Result<Server, Error> {
pub fn new(mut config: ServerConfig) -> Result<Server, Error> {
let pool_adapter = Arc::new(PoolToChainAdapter::new());
let pool_net_adapter = Arc::new(PoolToNetAdapter::new());
let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(
@ -119,50 +102,34 @@ impl Server {
tx_pool.clone(),
));
// thread pool (single thread) for offloading handler.handle()
// work from the main run loop in p2p_server
let cpu_pool = CpuPool::new(1);
let p2p_config = config.p2p_config.clone();
let p2p_server = Arc::new(p2p::Server::new(
config.db_root.clone(),
config.capabilities,
p2p_config,
net_adapter.clone(),
genesis.hash(),
cpu_pool.clone(),
)?);
chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone());
net_adapter.init(p2p_server.peers.clone());
let seed = seed::Seeder::new(
config.capabilities, p2p_server.clone(), p2p_server.peers.clone());
match config.seeding_type.clone() {
Seeding::None => {
warn!(
LOGGER,
"No seed(s) configured, will stay solo until connected to"
);
seed.connect_and_monitor(
evt_handle.clone(),
seed::predefined_seeds(vec![]),
);
}
Seeding::List => {
seed.connect_and_monitor(
evt_handle.clone(),
seed::predefined_seeds(config.seeds.as_mut().unwrap().clone()),
);
}
Seeding::WebStatic => {
seed.connect_and_monitor(
evt_handle.clone(),
seed::web_seeds(evt_handle.clone()),
);
}
_ => {}
if config.seeding_type.clone() != Seeding::Programmatic {
let seeder = match config.seeding_type.clone() {
Seeding::None => {
warn!(LOGGER, "No seed configured, will stay solo until connected to");
seed::predefined_seeds(vec![])
}
Seeding::List => {
seed::predefined_seeds(config.seeds.as_mut().unwrap().clone())
}
Seeding::WebStatic => {
seed::web_seeds()
}
_ => unreachable!(),
};
seed::connect_and_monitor(p2p_server.clone(), config.capabilities, seeder);
}
let skip_sync_wait = match config.skip_sync_wait {
@ -175,9 +142,12 @@ impl Server {
p2p_server.peers.clone(),
shared_chain.clone(),
skip_sync_wait,
);
);
evt_handle.spawn(p2p_server.start(evt_handle.clone()).map_err(|_| ()));
let p2p_inner = p2p_server.clone();
let _ = thread::Builder::new().name("p2p-server".to_string()).spawn(move || {
p2p_inner.listen()
});
info!(LOGGER, "Starting rest apis at: {}", &config.api_http_addr);
@ -190,8 +160,7 @@ impl Server {
warn!(LOGGER, "Grin server started.");
Ok(Server {
config: config.clone(),
evt_handle: evt_handle.clone(),
config: config,
p2p: p2p_server,
chain: shared_chain,
tx_pool: tx_pool,
@ -201,13 +170,7 @@ impl Server {
/// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> {
let handle = self.evt_handle.clone();
handle.spawn(
self.p2p
.connect_peer(addr, handle.clone())
.map(|_| ())
.map_err(|_| ()),
);
self.p2p.connect(&addr)?;
Ok(())
}

View file

@ -23,10 +23,6 @@ extern crate grin_util as util;
extern crate grin_wallet as wallet;
extern crate blake2_rfc as blake2;
extern crate futures;
extern crate futures_cpupool;
extern crate tokio_core;
extern crate tokio_timer;
use std::thread;
use std::time;
@ -34,9 +30,6 @@ use std::default::Default;
use std::fs;
use std::sync::{Arc, Mutex};
use self::tokio_core::reactor;
use self::tokio_timer::Timer;
use wallet::WalletConfig;
/// Just removes all results from previous runs
@ -189,8 +182,6 @@ impl LocalServerContainer {
}
pub fn run_server(&mut self, duration_in_seconds: u64) -> grin::ServerStats {
let mut event_loop = reactor::Core::new().unwrap();
let api_addr = format!("{}:{}", self.config.base_addr, self.config.api_server_port);
let mut seeding_type = grin::Seeding::None;
@ -201,7 +192,7 @@ impl LocalServerContainer {
seeds = vec![self.config.seed_addr.to_string()];
}
let s = grin::Server::future(
let s = grin::Server::new(
grin::ServerConfig {
api_http_addr: api_addr,
db_root: format!("{}/.grin", self.working_dir),
@ -215,7 +206,6 @@ impl LocalServerContainer {
skip_sync_wait:Some(true),
..Default::default()
},
&event_loop.handle(),
).unwrap();
self.p2p_server_stats = Some(s.get_server_stats().unwrap());
@ -253,10 +243,6 @@ impl LocalServerContainer {
s.connect_peer(p.parse().unwrap()).unwrap();
}
let timeout = Timer::default().sleep(time::Duration::from_secs(duration_in_seconds));
event_loop.run(timeout).unwrap();
if self.wallet_is_running {
self.stop_wallet();
}
@ -517,7 +503,7 @@ impl LocalServerContainerPool {
let handle = thread::spawn(move || {
if is_seeding && !s.config.is_seeding {
// there's a seed and we're not it, so hang around longer and give the seed
// a chance to start
// a chance to start
thread::sleep(time::Duration::from_millis(2000));
}
let server_ref = s.run_server(run_length);

View file

@ -21,20 +21,12 @@ extern crate grin_pow as pow;
extern crate grin_util as util;
extern crate grin_wallet as wallet;
extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;
mod framework;
use std::thread;
use std::time;
use std::default::Default;
use futures::{Async, Future, Poll};
use futures::task::current;
use tokio_core::reactor;
use core::global;
use core::global::ChainTypes;
@ -76,6 +68,7 @@ fn basic_genesis_mine() {
/// messages they all end up connected.
#[test]
fn simulate_seeding() {
util::init_test_logger();
global::set_mining_mode(ChainTypes::AutomatedTesting);
let test_name_dir = "simulate_seeding";
@ -114,7 +107,7 @@ fn simulate_seeding() {
pool.create_server(&mut server_config);
}
pool.connect_all_peers();
// pool.connect_all_peers();
let _ = pool.run_all_servers();
}
@ -169,7 +162,7 @@ fn simulate_parallel_mining() {
pool.create_server(&mut server_config);
}
pool.connect_all_peers();
// pool.connect_all_peers();
let _ = pool.run_all_servers();
@ -191,8 +184,6 @@ fn a_simulate_block_propagation() {
let test_name_dir = "grin-prop";
framework::clean_all_output(test_name_dir);
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
@ -212,7 +203,7 @@ fn a_simulate_block_propagation() {
// instantiates 5 servers on different ports
let mut servers = vec![];
for n in 0..5 {
let s = grin::Server::future(
let s = grin::Server::new(
grin::ServerConfig {
api_http_addr: format!("127.0.0.1:{}", 19000 + n),
db_root: format!("target/{}/grin-prop-{}", test_name_dir, n),
@ -225,7 +216,6 @@ fn a_simulate_block_propagation() {
chain_type: core::global::ChainTypes::AutomatedTesting,
..Default::default()
},
&handle,
).unwrap();
servers.push(s);
}
@ -236,10 +226,18 @@ fn a_simulate_block_propagation() {
// monitor for a change of head on a different server and check whether
// chain height has changed
evtlp.run(change(&servers[4]).and_then(|tip| {
assert!(tip.height == original_height + 1);
Ok(())
}));
loop {
let mut count = 0;
for n in 0..5 {
if servers[n].head().height > 3 {
count += 1;
}
}
if count == 5 {
break;
}
thread::sleep(time::Duration::from_millis(100));
}
}
/// Creates 2 different disconnected servers, mine a few blocks on one, connect
@ -255,9 +253,6 @@ fn simulate_full_sync() {
let test_name_dir = "grin-sync";
framework::clean_all_output(test_name_dir);
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
let mut plugin_config = pow::types::CuckooMinerPluginConfig::default();
let mut plugin_config_vec: Vec<pow::types::CuckooMinerPluginConfig> = Vec::new();
plugin_config.type_filter = String::from("mean_cpu");
@ -273,62 +268,30 @@ fn simulate_full_sync() {
..Default::default()
};
// instantiates 2 servers on different ports
let mut servers = vec![];
for n in 0..2 {
let config = grin::ServerConfig {
api_http_addr: format!("127.0.0.1:{}", 19000 + n),
db_root: format!("target/{}/grin-sync-{}", test_name_dir, n),
p2p_config: p2p::P2PConfig {
port: 11000 + n,
..p2p::P2PConfig::default()
},
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:11000".to_string()]),
chain_type: core::global::ChainTypes::AutomatedTesting,
..Default::default()
};
let s = grin::Server::future(config, &handle).unwrap();
servers.push(s);
}
let s1 = grin::Server::new(config(0, "grin-sync")).unwrap();
// mine a few blocks on server 1
servers[0].start_miner(miner_config);
s1.start_miner(miner_config);
thread::sleep(time::Duration::from_secs(5));
// 2 should get blocks
evtlp.run(change(&servers[1]));
}
// Builds the change future, monitoring for a change of head on the provided
// server
fn change<'a>(s: &'a grin::Server) -> HeadChange<'a> {
let start_head = s.head();
HeadChange {
server: s,
original: start_head,
let mut conf = config(1, "grin-sync");
conf.skip_sync_wait = Some(false);
let s2 = grin::Server::new(conf).unwrap();
while s2.head().height < 4 {
thread::sleep(time::Duration::from_millis(100));
}
}
/// Future that monitors when a server has had its head updated. Current
/// implementation isn't optimized, only use for tests.
struct HeadChange<'a> {
server: &'a grin::Server,
original: chain::Tip,
}
impl<'a> Future for HeadChange<'a> {
type Item = chain::Tip;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let new_head = self.server.head();
if new_head.last_block_h != self.original.last_block_h {
Ok(Async::Ready(new_head))
} else {
// egregious polling, asking the task to schedule us every iteration
current().notify();
Ok(Async::NotReady)
}
fn config(n: u16, test_name_dir: &str) -> grin::ServerConfig {
grin::ServerConfig {
api_http_addr: format!("127.0.0.1:{}", 19000 + n),
db_root: format!("target/{}/grin-sync-{}", test_name_dir, n),
p2p_config: p2p::P2PConfig {
port: 11000 + n,
..p2p::P2PConfig::default()
},
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:11000".to_string()]),
chain_type: core::global::ChainTypes::AutomatedTesting,
..Default::default()
}
}

View file

@ -7,17 +7,12 @@ workspace = ".."
[dependencies]
bitflags = "^1.0"
byteorder = "^0.5"
futures = "^0.1.15"
futures-cpupool = "^0.1.3"
slog = { version = "^2.0.12", features = ["max_level_trace", "release_max_level_trace"] }
net2 = "0.2.0"
rand = "^0.3"
serde = "~1.0.8"
serde_derive = "~1.0.8"
bytes = "0.4.3"
tokio-core="^0.1.1"
tokio-timer="^0.1.0"
tokio-io="^0.1"
time = "^0.1"
enum_primitive = "^0.1.0"
num = "^0.1.36"

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2016-2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -14,388 +14,170 @@
//! Provides a connection wrapper that handles the lower level tasks in sending
//! or receiving data from the TCP socket, as well as dealing with timeouts.
//!
//! Because of a few idiosyncracies in the Rust `TcpStream`, this has to use
//! async I/O to be able to both read *and* write on the connection. Which
//! forces us to go through some additional gymnastic to loop over the async
//! stream and make sure we get the right number of bytes out.
use std::iter;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::io::{self, Write};
use std::sync::{Arc, Mutex, mpsc};
use std::net::TcpStream;
use std::thread;
use std::time;
use futures;
use futures::{Future, Stream, stream};
use futures::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use tokio_timer::{Timer, TimerError};
use core::core::hash::Hash;
use core::ser;
use msg::*;
use types::Error;
use rate_limit::*;
use types::*;
use util::LOGGER;
/// Handler to provide to the connection, will be called back anytime a message
/// is received. The provided sender can be use to immediately send back
/// another message.
pub trait Handler: Sync + Send {
/// Handle function to implement to process incoming messages. A sender to
/// reply immediately as well as the message header and its unparsed body
/// are provided.
fn handle(
&self,
sender: UnboundedSender<Vec<u8>>,
header: MsgHeader,
body: Vec<u8>,
) -> Result<Option<Hash>, ser::Error>;
pub trait MessageHandler: Send + 'static {
fn consume(&self, msg: &mut Message) -> Result<Option<(Vec<u8>, Type)>, Error>;
}
impl<F> Handler for F
where
F: Fn(UnboundedSender<Vec<u8>>, MsgHeader, Vec<u8>)
-> Result<Option<Hash>, ser::Error>,
F: Sync + Send,
{
fn handle(
&self,
sender: UnboundedSender<Vec<u8>>,
header: MsgHeader,
body: Vec<u8>,
) -> Result<Option<Hash>, ser::Error> {
self(sender, header, body)
}
}
/// A higher level connection wrapping the TcpStream. Maintains the amount of
/// data transmitted and deals with the low-level task of sending and
/// receiving data, parsing message headers and timeouts.
#[allow(dead_code)]
pub struct Connection {
// Channel to push bytes to the remote peer
outbound_chan: UnboundedSender<Vec<u8>>,
// Close the connection with the remote peer
close_chan: Sender<()>,
// Bytes we've sent.
sent_bytes: Arc<Mutex<u64>>,
// Bytes we've received.
received_bytes: Arc<Mutex<u64>>,
// Counter for read errors.
error_count: Mutex<u64>,
}
impl Connection {
/// Start listening on the provided connection and wraps it. Does not hang
/// the current thread, instead just returns a future and the Connection
/// itself.
pub fn listen<F>(
conn: TcpStream,
pool: CpuPool,
handler: F,
) -> (Connection, Box<Future<Item = (), Error = Error>>)
where
F: Handler + 'static,
{
let (reader, writer) = conn.split();
// Set Max Read to 12 Mb/s
let reader = ThrottledReader::new(reader, 12_000_000);
// Set Max Write to 12 Mb/s
let writer = ThrottledWriter::new(writer, 12_000_000);
// prepare the channel that will transmit data to the connection writer
let (tx, rx) = futures::sync::mpsc::unbounded();
// same for closing the connection
let (close_tx, close_rx) = futures::sync::mpsc::channel(1);
let close_conn = close_rx
.for_each(|_| Ok(()))
.map_err(|_| Error::ConnectionClose);
let me = Connection {
outbound_chan: tx.clone(),
close_chan: close_tx,
sent_bytes: Arc::new(Mutex::new(0)),
received_bytes: Arc::new(Mutex::new(0)),
error_count: Mutex::new(0),
};
// setup the reading future, getting messages from the peer and processing them
let read_msg = me.read_msg(tx, reader, handler, pool).map(|_| ());
// setting the writing future
// getting messages from our system and sending them out
let write_msg = me.write_msg(rx, writer).map(|_| ());
// select between our different futures and return them
let fut = Box::new(
close_conn
.select(read_msg.select(write_msg).map(|_| ()).map_err(|(e, _)| e))
.map(|_| ())
.map_err(|(e, _)| e),
);
(me, fut)
}
/// Prepares the future that gets message data produced by our system and
/// sends it to the peer connection
fn write_msg<W>(
&self,
rx: UnboundedReceiver<Vec<u8>>,
writer: W,
) -> Box<Future<Item = W, Error = Error>>
where
W: AsyncWrite + 'static,
{
let sent_bytes = self.sent_bytes.clone();
let send_data = rx
.map_err(|_| Error::ConnectionClose)
.map(move |data| {
trace!(LOGGER, "write_msg: start");
// add the count of bytes sent
let mut sent_bytes = sent_bytes.lock().unwrap();
*sent_bytes += data.len() as u64;
data
})
// write the data and make sure the future returns the right types
.fold(writer, |writer, data| {
write_all(writer, data).map_err(|e| Error::Connection(e)).map(|(writer, _)| {
trace!(LOGGER, "write_msg: done");
writer
})
});
Box::new(send_data)
}
/// Prepares the future reading from the peer connection, parsing each
/// message and forwarding them appropriately based on their type
fn read_msg<F, R>(
&self,
sender: UnboundedSender<Vec<u8>>,
reader: R,
handler: F,
pool: CpuPool,
) -> Box<Future<Item = R, Error = Error>>
where
F: Handler + 'static,
R: AsyncRead + Send + 'static,
{
// infinite iterator stream so we repeat the message reading logic until the
// peer is stopped
let iter = stream::iter_ok(iter::repeat(()).map(Ok::<(), Error>));
// setup the reading future, getting messages from the peer and processing them
let recv_bytes = self.received_bytes.clone();
let handler = Arc::new(handler);
let mut count = 0;
let read_msg = iter.buffered(1).fold(reader, move |reader, _| {
count += 1;
trace!(LOGGER, "read_msg: count (per buffered fold): {}", count);
let recv_bytes = recv_bytes.clone();
let handler = handler.clone();
let sender_inner = sender.clone();
let pool = pool.clone();
// first read the message header
read_exact(reader, vec![0u8; HEADER_LEN as usize])
.from_err()
.and_then(move |(reader, buf)| {
trace!(LOGGER, "read_msg: start");
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
Ok((reader, header))
})
.and_then(move |(reader, header)| {
// now that we have a size, proceed with the body
read_exact(reader, vec![0u8; header.msg_len as usize])
.map(|(reader, buf)| (reader, header, buf))
.from_err()
})
.and_then(move |(reader, header, buf)| {
// add the count of bytes received
let mut recv_bytes = recv_bytes.lock().unwrap();
*recv_bytes += header.serialized_len() + header.msg_len;
pool.spawn_fn(move || {
let msg_type = header.msg_type;
if let Err(e) = handler.handle(sender_inner.clone(), header, buf) {
debug!(LOGGER, "Invalid {:?} message: {}", msg_type, e);
return Err(Error::Serialization(e));
}
trace!(LOGGER, "read_msg: done (via cpu_pool)");
Ok(reader)
})
})
});
Box::new(read_msg)
}
/// Utility function to send any Writeable. Handles adding the header and
/// serialization.
pub fn send_msg<W: ser::Writeable>(&self, t: Type, body: &W) -> Result<(), Error> {
let mut body_data = vec![];
try!(ser::serialize(&mut body_data, body));
let mut data = vec![];
try!(ser::serialize(
&mut data,
&MsgHeader::new(t, body_data.len() as u64),
));
data.append(&mut body_data);
self.outbound_chan
.unbounded_send(data)
.map_err(|_| Error::ConnectionClose)
}
/// Bytes sent and received by this peer to the remote peer.
pub fn transmitted_bytes(&self) -> (u64, u64) {
let sent = *self.sent_bytes.lock().unwrap();
let recv = *self.received_bytes.lock().unwrap();
(sent, recv)
}
}
/// Connection wrapper that handles a request/response oriented interaction with
/// a timeout.
pub struct TimeoutConnection {
underlying: Connection,
expected_responses: Arc<Mutex<Vec<InFlightRequest>>>,
}
#[derive(Debug, Clone)]
struct InFlightRequest {
msg_type: Type,
hash: Option<Hash>,
time: Instant,
}
impl TimeoutConnection {
/// Same as Connection
pub fn listen<F>(
conn: TcpStream,
pool: CpuPool,
handler: F,
) -> (TimeoutConnection, Box<Future<Item = (), Error = Error>>)
where
F: Handler + 'static,
{
let expects: Arc<Mutex<Vec<InFlightRequest>>> = Arc::new(Mutex::new(vec![]));
// Decorates the handler to remove the "subscription" from the expected
// responses. We got our replies, so no timeout should occur.
let exp = expects.clone();
let (conn, fut) = Connection::listen(conn, pool, move |sender, header: MsgHeader, data| {
let msg_type = header.msg_type;
let recv_h = try!(handler.handle(sender, header, data));
let mut expects = exp.lock().unwrap();
let filtered = expects
.iter()
.filter(|x| {
let res = x.msg_type != msg_type || x.hash.is_some() && x.hash != recv_h;
if res {
trace!(LOGGER, "timeout_conn: received: {:?}, {:?}", x.msg_type, x.hash);
}
res
})
.cloned()
.collect::<Vec<_>>();
*expects = filtered;
Ok(recv_h)
});
// Registers a timer with the event loop to regularly check for timeouts.
let exp = expects.clone();
let timer = Timer::default()
.interval(Duration::new(2, 0))
.fold((), move |_, _| {
let exp = exp.lock().unwrap();
trace!(LOGGER, "timeout_conn: currently registered: {:?}", exp.len());
for x in exp.iter() {
if Instant::now() - x.time > Duration::new(5, 0) {
trace!(LOGGER, "timeout_conn: timeout: {:?}, {:?}", x.msg_type, x.hash);
return Err(TimerError::TooLong);
}
}
Ok(())
})
.from_err();
let me = TimeoutConnection {
underlying: conn,
expected_responses: expects,
};
(
me,
Box::new(fut.select(timer).map(|_| ()).map_err(|(e1, _)| e1)),
)
}
/// Sends a request and registers a timer on the provided message type and
/// optionally the hash of the sent data.
/// Skips the request if we have already sent the same response and
/// we are still waiting for a response (not yet timed out).
pub fn send_request<W: ser::Writeable>(
&self,
t: Type,
rt: Type,
body: &W,
expect_h: Option<(Hash)>,
) -> Result<(), Error> {
{
let expects = self.expected_responses.lock().unwrap();
let existing = expects.iter().find(|x| {
x.msg_type == rt && x.hash == expect_h
});
if let Some(x) = existing {
trace!(
LOGGER,
"timeout_conn: in_flight: {:?}, {:?} (skipping)",
x.msg_type,
x.hash,
);
return Ok(())
// Macro to simplify the boilerplate around asyn I/O error handling,
// especially with WouldBlock kind of errors.
macro_rules! try_break {
($chan:ident, $inner:expr) => {
match $inner {
Ok(v) => Some(v),
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => {
//println!("++ not ready");
None
}
Err(e) => {
let _ = $chan.send(e);
break;
}
}
let _sent = try!(self.underlying.send_msg(t, body));
{
let mut expects = self.expected_responses.lock().unwrap();
let req = InFlightRequest {
msg_type: rt,
hash: expect_h,
time: Instant::now(),
};
trace!(
LOGGER,
"timeout_conn: registering: {:?}, {:?}",
req.msg_type,
req.hash,
);
expects.push(req);
}
Ok(())
}
/// Same as Connection
pub fn send_msg<W: ser::Writeable>(&self, t: Type, body: &W) -> Result<(), Error> {
self.underlying.send_msg(t, body)
}
/// Same as Connection
pub fn transmitted_bytes(&self) -> (u64, u64) {
self.underlying.transmitted_bytes()
}
}
pub struct Message<'a> {
pub header: MsgHeader,
conn: &'a mut TcpStream,
}
impl<'a> Message<'a> {
fn from_header(header: MsgHeader, conn: &'a mut TcpStream) -> Message<'a> {
Message{header, conn}
}
pub fn body<T>(&mut self) -> Result<T, Error> where T: ser::Readable {
read_body(&self.header, self.conn)
}
}
// TODO count sent and received
pub struct Tracker {
/// Bytes we've sent.
pub sent_bytes: Arc<Mutex<u64>>,
/// Bytes we've received.
pub received_bytes: Arc<Mutex<u64>>,
/// Channel to allow sending data through the connection
pub send_channel: mpsc::Sender<Vec<u8>>,
/// Channel to close the connection
pub close_channel: mpsc::Sender<()>,
/// Channel to check for errors on the connection
pub error_channel: mpsc::Receiver<Error>,
}
impl Tracker {
pub fn send<T>(&self, body: T, msg_type: Type) -> Result<(), Error>
where
T: ser::Writeable
{
let (header_buf, body_buf) = write_to_bufs(body, msg_type);
self.send_channel.send(header_buf)?;
self.send_channel.send(body_buf)?;
Ok(())
}
}
/// Start listening on the provided connection and wraps it. Does not hang
/// the current thread, instead just returns a future and the Connection
/// itself.
pub fn listen<H>(stream: TcpStream, handler: H) -> Tracker
where
H: MessageHandler,
{
let (send_tx, send_rx) = mpsc::channel();
let (close_tx, close_rx) = mpsc::channel();
let (error_tx, error_rx) = mpsc::channel();
stream.set_nonblocking(true).expect("Non-blocking IO not available.");
poll(stream, handler, send_rx, send_tx.clone(), error_tx, close_rx);
Tracker {
sent_bytes: Arc::new(Mutex::new(0)),
received_bytes: Arc::new(Mutex::new(0)),
send_channel: send_tx,
close_channel: close_tx,
error_channel: error_rx,
}
}
fn poll<H>(
conn: TcpStream,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
send_tx: mpsc::Sender<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>
)
where
H: MessageHandler,
{
let mut conn = conn;
let _ = thread::Builder::new().name("peer".to_string()).spawn(move || {
let sleep_time = time::Duration::from_millis(1);
let conn = &mut conn;
let mut retry_send = Err(());
loop {
// check the read end
if let Some(h) = try_break!(error_tx, read_header(conn)) {
let mut msg = Message::from_header(h, conn);
debug!(LOGGER, "Received message header, type {:?}, len {}.", msg.header.msg_type, msg.header.msg_len);
if let Some(Some((body, typ))) = try_break!(error_tx, handler.consume(&mut msg)) {
respond(&send_tx, typ, body);
}
}
// check the write end
if let Ok::<Vec<u8>, ()>(data) = retry_send {
if let None = try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from)) {
retry_send = Ok(data);
} else {
retry_send = Err(());
}
} else if let Ok(data) = send_rx.try_recv() {
if let None = try_break!(error_tx, conn.write_all(&data[..]).map_err(&From::from)) {
retry_send = Ok(data);
} else {
retry_send = Err(());
}
} else {
retry_send = Err(());
}
// check the close channel
if let Ok(_) = close_rx.try_recv() {
debug!(LOGGER,
"Connection close with {} initiated by us",
conn.peer_addr().map(|a| a.to_string()).unwrap_or("?".to_owned()));
break;
}
thread::sleep(sleep_time);
}
});
}
fn respond(send_tx: &mpsc::Sender<Vec<u8>>, msg_type: Type, body: Vec<u8>) {
let header = ser::ser_vec(&MsgHeader::new(msg_type, body.len() as u64)).unwrap();
send_tx.send(header).unwrap();
send_tx.send(body).unwrap();
}

View file

@ -13,20 +13,17 @@
// limitations under the License.
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::{TcpStream, SocketAddr};
use std::sync::{Arc, RwLock};
use futures::{self, Future};
use rand::Rng;
use rand::os::OsRng;
use tokio_core::net::TcpStream;
use core::core::target::Difficulty;
use core::core::hash::Hash;
use msg::*;
use peer::Peer;
use types::*;
use protocol::ProtocolV1;
use util::LOGGER;
const NONCES_CAP: usize = 100;
@ -56,20 +53,19 @@ impl Handshake {
}
}
/// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect(
pub fn initiate(
&self,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: SocketAddr,
conn: TcpStream,
) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
conn: &mut TcpStream,
) -> Result<PeerInfo, Error> {
// prepare the first part of the handshake
let nonce = self.next_nonce();
let peer_addr = match conn.peer_addr() {
Ok(pa) => pa,
Err(e) => return Box::new(futures::future::err(Error::Connection(e))),
Err(e) => return Err(Error::Connection(e)),
};
let hand = Hand {
@ -83,121 +79,105 @@ impl Handshake {
user_agent: USER_AGENT.to_string(),
};
let genesis = self.genesis.clone();
let config = self.config.clone();
// write and read the handshake response
Box::new(
write_msg(conn, hand, Type::Hand)
.and_then(|conn| read_msg::<Shake>(conn))
.and_then(move |(conn, shake)| {
if shake.version != PROTOCOL_VERSION {
Err(Error::ProtocolMismatch {
us: PROTOCOL_VERSION,
peer: shake.version,
})
} else if shake.genesis != genesis {
Err(Error::GenesisMismatch {
us: genesis,
peer: shake.genesis,
})
} else {
let peer_info = PeerInfo {
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: peer_addr,
version: shake.version,
total_difficulty: shake.total_difficulty,
};
write_message(conn, hand, Type::Hand)?;
let shake: Shake = read_message(conn, Type::Shake)?;
if shake.version != PROTOCOL_VERSION {
return Err(Error::ProtocolMismatch {
us: PROTOCOL_VERSION,
peer: shake.version,
});
} else if shake.genesis != self.genesis {
return Err(Error::GenesisMismatch {
us: self.genesis,
peer: shake.genesis,
});
}
let peer_info = PeerInfo {
capabilities: shake.capabilities,
user_agent: shake.user_agent,
addr: peer_addr,
version: shake.version,
total_difficulty: shake.total_difficulty,
};
// If denied then we want to close the connection
// (without providing our peer with any details why).
if Peer::is_denied(config, peer_info.addr) {
return Err(Error::ConnectionClose);
}
debug!(
LOGGER,
"Connected! Cumulative {} offered from {:?} {:?} {:?}",
peer_info.total_difficulty.into_num(),
peer_info.addr,
peer_info.user_agent,
peer_info.capabilities
);
// when more than one protocol version is supported, choosing should go here
Ok((conn, ProtocolV1::new(), peer_info))
}
}),
)
// If denied then we want to close the connection
// (without providing our peer with any details why).
if Peer::is_denied(&self.config, &peer_info.addr) {
return Err(Error::ConnectionClose);
}
debug!(
LOGGER,
"Connected! Cumulative {} offered from {:?} {:?} {:?}",
peer_info.total_difficulty.into_num(),
peer_info.addr,
peer_info.user_agent,
peer_info.capabilities
);
// when more than one protocol version is supported, choosing should go here
Ok(peer_info)
}
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake(
pub fn accept(
&self,
capab: Capabilities,
total_difficulty: Difficulty,
conn: TcpStream,
) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonces = self.nonces.clone();
let genesis = self.genesis.clone();
let config = self.config.clone();
conn: &mut TcpStream,
) -> Result<PeerInfo, Error> {
Box::new(
read_msg::<Hand>(conn)
.and_then(move |(conn, hand)| {
if hand.version != PROTOCOL_VERSION {
return Err(Error::ProtocolMismatch {
us: PROTOCOL_VERSION,
peer: hand.version,
});
} else if hand.genesis != genesis {
return Err(Error::GenesisMismatch {
us: genesis,
peer: hand.genesis,
});
} else {
// check the nonce to see if we are trying to connect to ourselves
let nonces = nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::PeerWithSelf);
}
}
let hand: Hand = read_message(conn, Type::Hand)?;
// all the reasons we could refuse this connection for
if hand.version != PROTOCOL_VERSION {
return Err(Error::ProtocolMismatch {
us: PROTOCOL_VERSION,
peer: hand.version,
});
} else if hand.genesis != self.genesis {
return Err(Error::GenesisMismatch {
us: self.genesis,
peer: hand.genesis,
});
} else {
// check the nonce to see if we are trying to connect to ourselves
let nonces = self.nonces.read().unwrap();
if nonces.contains(&hand.nonce) {
return Err(Error::PeerWithSelf);
}
}
// all good, keep peer info
let peer_info = PeerInfo {
capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: extract_ip(&hand.sender_addr.0, &conn),
version: hand.version,
total_difficulty: hand.total_difficulty,
};
// all good, keep peer info
let peer_info = PeerInfo {
capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: extract_ip(&hand.sender_addr.0, &conn),
version: hand.version,
total_difficulty: hand.total_difficulty,
};
// At this point we know the published ip and port of the peer
// so check if we are configured to explicitly allow or deny it.
// If denied then we want to close the connection
// (without providing our peer with any details why).
if Peer::is_denied(config, peer_info.addr) {
return Err(Error::ConnectionClose);
}
// At this point we know the published ip and port of the peer
// so check if we are configured to explicitly allow or deny it.
// If denied then we want to close the connection
// (without providing our peer with any details why).
if Peer::is_denied(&self.config, &peer_info.addr) {
return Err(Error::ConnectionClose);
}
// send our reply with our info
let shake = Shake {
version: PROTOCOL_VERSION,
capabilities: capab,
genesis: genesis,
total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(),
};
Ok((conn, shake, peer_info))
})
.and_then(|(conn, shake, peer_info)| {
debug!(LOGGER, "Success handshake with {}.", peer_info.addr);
write_msg(conn, shake, Type::Shake)
// when more than one protocol version is supported, choosing should go here
.map(|conn| (conn, ProtocolV1::new(), peer_info))
}),
)
// send our reply with our info
let shake = Shake {
version: PROTOCOL_VERSION,
capabilities: capab,
genesis: self.genesis,
total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(),
};
write_message(conn, shake, Type::Shake)?;
debug!(LOGGER, "Success handshake with {}.", peer_info.addr);
// when more than one protocol version is supported, choosing should go here
Ok(peer_info)
}
/// Generate a new random nonce and store it in our ring buffer

View file

@ -25,8 +25,6 @@ extern crate bitflags;
extern crate bytes;
#[macro_use]
extern crate enum_primitive;
extern crate futures;
extern crate futures_cpupool;
#[macro_use]
extern crate grin_core as core;
@ -40,22 +38,18 @@ extern crate serde_derive;
#[macro_use]
extern crate slog;
extern crate time;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_timer;
mod conn;
pub mod handshake;
mod rate_limit;
pub mod msg;
mod peer;
mod peers;
mod protocol;
mod server;
mod serv;
mod store;
mod types;
pub use server::{DummyAdapter, Server};
pub use serv::{Server, DummyAdapter};
pub use peers::Peers;
pub use peer::Peer;
pub use types::{Capabilities, Error, ChainAdapter, P2PConfig, PeerInfo, MAX_BLOCK_HEADERS,

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2016-2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -14,13 +14,12 @@
//! Message types that transit over the network and related serialization code.
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::io::{self, Read, Write};
use std::net::{TcpStream, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::thread;
use std::time;
use num::FromPrimitive;
use futures::future::{ok, Future};
use tokio_core::net::TcpStream;
use tokio_io::io::{read_exact, write_all};
use core::consensus::MAX_MSG_LEN;
use core::core::BlockHeader;
use core::core::hash::Hash;
@ -69,64 +68,106 @@ enum_from_primitive! {
}
}
/// Future combinator to read any message where the body is a Readable. Reads
/// the header first, handles its validation and then reads the Readable body,
/// allocating buffers of the right size.
pub fn read_msg<T>(conn: TcpStream) -> Box<Future<Item = (TcpStream, T), Error = Error>>
where
T: Readable + 'static,
{
let read_header = read_exact(conn, vec![0u8; HEADER_LEN as usize])
.from_err()
.and_then(|(reader, buf)| {
let header = try!(ser::deserialize::<MsgHeader>(&mut &buf[..]));
if header.msg_len > MAX_MSG_LEN {
// TODO add additional restrictions on a per-message-type basis to avoid 20MB
// pings
return Err(Error::Serialization(ser::Error::TooLargeReadErr));
}
Ok((reader, header))
});
/// The default implementation of read_exact is useless with async TcpStream as
/// will return as soon as something has been read, regardless of completeness.
/// This implementation will block until it has read exactly `len` bytes and
/// returns them as a `vec<u8>`. Additionally, a timeout in milliseconds will
/// abort the read when it's met. Note that the timeout time is approximate.
pub fn read_exact(conn: &mut TcpStream, mut buf: &mut [u8], timeout: u32) -> io::Result<()> {
let sleep_time = time::Duration::from_millis(1);
let mut count = 0;
let read_msg = read_header
.and_then(|(reader, header)| {
read_exact(reader, vec![0u8; header.msg_len as usize]).from_err()
})
.and_then(|(reader, buf)| {
let body = try!(ser::deserialize(&mut &buf[..]));
Ok((reader, body))
});
Box::new(read_msg)
loop {
match conn.read(buf) {
Ok(0) => break,
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => return Err(e),
}
if !buf.is_empty() {
thread::sleep(sleep_time);
count += 1;
} else {
break;
}
if count > timeout {
return Err(io::Error::new(io::ErrorKind::TimedOut, "reading from tcp stream"));
}
}
Ok(())
}
/// Future combinator to write a full message from a Writeable payload.
/// Serializes the payload first and then sends the message header and that
/// payload.
pub fn write_msg<T>(
conn: TcpStream,
/// Read a header from the provided connection without blocking if the
/// underlying stream is async. Typically headers will be polled for, so
/// we do not want to block.
pub fn read_header(conn: &mut TcpStream) -> Result<MsgHeader, Error> {
let mut head = vec![0u8; HEADER_LEN as usize];
conn.read_exact(&mut head)?;
let header = ser::deserialize::<MsgHeader>(&mut &head[..])?;
if header.msg_len > MAX_MSG_LEN {
// TODO additional restrictions for each msg type to avoid 20MB pings...
return Err(Error::Serialization(ser::Error::TooLargeReadErr));
}
Ok(header)
}
/// Read a message body from the provided connection, always blocking
/// until we have a result (or timeout).
pub fn read_body<T>(h: &MsgHeader, conn: &mut TcpStream) -> Result<T, Error>
where
T: Readable,
{
let mut body = vec![0u8; h.msg_len as usize];
read_exact(conn, &mut body, 15000)?;
ser::deserialize(&mut &body[..]).map_err(From::from)
}
/// Reads a full message from the underlying connection.
pub fn read_message<T>(conn: &mut TcpStream, msg_type: Type) -> Result<T, Error>
where
T: Readable,
{
let header = read_header(conn)?;
if header.msg_type != msg_type {
return Err(Error::BadMessage);
}
read_body(&header, conn)
}
pub fn write_to_bufs<T>(
msg: T,
msg_type: Type,
) -> Box<Future<Item = TcpStream, Error = Error>>
) -> (Vec<u8>, Vec<u8>)
where
T: Writeable,
{
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg).unwrap();
// build and serialize the header using the body size
let mut header_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen)).unwrap();
(header_buf, body_buf)
}
pub fn write_message<T>(
conn: &mut TcpStream,
msg: T,
msg_type: Type,
) -> Result<(), Error>
where
T: Writeable + 'static,
{
let write_msg = ok(conn).and_then(move |conn| {
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg).unwrap();
// build and serialize the header using the body size
let mut header_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut header_buf, &MsgHeader::new(msg_type, blen)).unwrap();
// send the whole thing
write_all(conn, header_buf)
.and_then(|(conn, _)| write_all(conn, body_buf))
.map(|(conn, _)| conn)
.from_err()
});
Box::new(write_msg)
let (header_buf, body_buf) = write_to_bufs(msg, msg_type);
// send the whole thing
conn.write_all(&header_buf[..])?;
conn.write_all(&body_buf[..])?;
Ok(())
}
/// Header of any protocol message, used to identify incoming messages.

View file

@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, RwLock};
use futures::Future;
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream;
use conn;
use core::core;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use handshake::Handshake;
use msg;
use protocol::Protocol;
use msg::*;
use types::*;
use util::LOGGER;
@ -37,10 +37,10 @@ enum State {
pub struct Peer {
pub info: PeerInfo,
proto: Box<Protocol>,
state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter,
connection: Option<conn::Tracker>
}
unsafe impl Sync for Peer {}
@ -48,77 +48,50 @@ unsafe impl Send for Peer {}
impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, proto: Box<Protocol>, na: Arc<NetAdapter>) -> Peer {
fn new(info: PeerInfo, na: Arc<NetAdapter>) -> Peer {
Peer {
info: info,
proto: proto,
state: Arc::new(RwLock::new(State::Connected)),
tracking_adapter: TrackingAdapter::new(na),
connection: None,
}
}
/// Initiates the handshake with another peer.
pub fn connect(
conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: SocketAddr,
hs: Arc<Handshake>,
na: Arc<NetAdapter>,
) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn)
.and_then(|(conn, proto, info)| {
Ok((conn, Peer::new(info, Box::new(proto), na)))
});
Box::new(connect_peer)
}
/// Accept a handshake initiated by another peer.
pub fn accept(
conn: TcpStream,
conn: &mut TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake,
na: Arc<NetAdapter>,
) -> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let hs_peer = hs.handshake(capab, total_difficulty, conn)
.and_then(|(conn, proto, info)| {
Ok((conn, Peer::new(info, Box::new(proto), na)))
});
Box::new(hs_peer)
) -> Result<Peer, Error> {
let info = hs.accept(capab, total_difficulty, conn)?;
Ok(Peer::new(info, na))
}
pub fn connect(
conn: &mut TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: SocketAddr,
hs: &Handshake,
na: Arc<NetAdapter>,
) -> Result<Peer, Error> {
let info = hs.initiate(capab, total_difficulty, self_addr, conn)?;
Ok(Peer::new(info, na))
}
/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn run(&self, conn: TcpStream, pool: CpuPool) -> Box<Future<Item = (), Error = Error>> {
pub fn start(&mut self, conn: TcpStream) {
let addr = self.info.addr;
let state = self.state.clone();
let adapter = Arc::new(self.tracking_adapter.clone());
Box::new(self.proto.handle(conn, adapter, addr, pool).then(move |res| {
// handle disconnection, standard disconnections aren't considered an error
let mut state = state.write().unwrap();
match res {
Ok(_) => {
*state = State::Disconnected;
info!(LOGGER, "Client {} disconnected.", addr);
Ok(())
}
Err(Error::Serialization(e)) => {
*state = State::Banned;
info!(LOGGER, "Client {} corrupted, ban.", addr);
Err(Error::Serialization(e))
}
Err(e) => {
*state = State::Disconnected;
debug!(LOGGER, "Client {} connection lost: {:?}", addr, e);
Ok(())
}
}
}))
let handler = Protocol::new(adapter, addr);
self.connection = Some(conn::listen(conn, handler));
}
pub fn is_denied(config: P2PConfig, peer_addr: SocketAddr) -> bool {
pub fn is_denied(config: &P2PConfig, peer_addr: &SocketAddr) -> bool {
let peer = format!("{}:{}", peer_addr.ip(), peer_addr.port());
if let Some(ref denied) = config.peers_deny {
if denied.contains(&peer) {
@ -142,12 +115,16 @@ impl Peer {
/// Whether this peer is still connected.
pub fn is_connected(&self) -> bool {
if !self.check_connection() {
return false
}
let state = self.state.read().unwrap();
*state == State::Connected
*state == State::Connected
}
/// Whether this peer has been banned.
pub fn is_banned(&self) -> bool {
let _ = self.check_connection();
let state = self.state.read().unwrap();
*state == State::Banned
}
@ -158,13 +135,10 @@ impl Peer {
*state = State::Banned;
}
/// Bytes sent and received by this peer to the remote peer.
pub fn transmitted_bytes(&self) -> (u64, u64) {
self.proto.transmitted_bytes()
}
/// Send a ping to the remote peer, providing our local difficulty and height
pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
self.proto.send_ping(total_difficulty, height)
let ping_msg = Ping{total_difficulty, height};
self.connection.as_ref().unwrap().send(ping_msg, msg::Type::Ping)
}
/// Sends the provided block to the remote peer. The request may be dropped
@ -172,7 +146,7 @@ impl Peer {
pub fn send_block(&self, b: &core::Block) -> Result<(), Error> {
if !self.tracking_adapter.has(b.hash()) {
debug!(LOGGER, "Send block {} to {}", b.hash(), self.info.addr);
self.proto.send_block(b)
self.connection.as_ref().unwrap().send(b, msg::Type::Block)
} else {
debug!(
LOGGER,
@ -187,7 +161,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<(), Error> {
if !self.tracking_adapter.has(b.hash()) {
debug!(LOGGER, "Send compact block {} to {}", b.hash(), self.info.addr);
self.proto.send_compact_block(b)
self.connection.as_ref().unwrap().send(b, msg::Type::CompactBlock)
} else {
debug!(
LOGGER,
@ -202,7 +176,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> {
if !self.tracking_adapter.has(bh.hash()) {
debug!(LOGGER, "Send header {} to {}", bh.hash(), self.info.addr);
self.proto.send_header(bh)
self.connection.as_ref().unwrap().send(bh, msg::Type::Header)
} else {
debug!(
LOGGER,
@ -219,34 +193,64 @@ impl Peer {
pub fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
if !self.tracking_adapter.has(tx.hash()) {
debug!(LOGGER, "Send tx {} to {}", tx.hash(), self.info.addr);
self.proto.send_transaction(tx)
self.connection.as_ref().unwrap().send(tx, msg::Type::Transaction)
} else {
debug!(LOGGER, "Not sending tx {} to {} (already seen)", tx.hash(), self.info.addr);
Ok(())
}
}
/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.proto.send_header_request(locator)
self.connection.as_ref().unwrap().send(
&Locator {
hashes: locator,
},
msg::Type::GetHeaders)
}
/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!(LOGGER, "Requesting block {} from {}", h, self.info.addr);
self.proto.send_block_request(h)
debug!(LOGGER, "Requesting block {} from peer {}.", h, self.info.addr);
self.connection.as_ref().unwrap().send(&h, msg::Type::GetBlock)
}
/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!(LOGGER, "Requesting compact block {} from {}", h, self.info.addr);
self.proto.send_compact_block_request(h)
self.connection.as_ref().unwrap().send(&h, msg::Type::GetCompactBlock)
}
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
debug!(LOGGER, "Asking {} for more peers.", self.info.addr);
self.proto.send_peer_request(capab)
self.connection.as_ref().unwrap().send(
&GetPeerAddrs {
capabilities: capab,
},
msg::Type::GetPeerAddrs)
}
/// Stops the peer, closing its connection
pub fn stop(&self) {
self.proto.close();
let _ = self.connection.as_ref().unwrap().close_channel.send(());
}
fn check_connection(&self) -> bool {
match self.connection.as_ref().unwrap().error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let mut state = self.state.write().unwrap();
*state = State::Banned;
info!(LOGGER, "Client {} corrupted, ban ({:?}).", self.info.addr, e);
false
}
Ok(e) => {
let mut state = self.state.write().unwrap();
*state = State::Disconnected;
debug!(LOGGER, "Client {} connection lost: {:?}", self.info.addr, e);
false
}
Err(_) => true,
}
}
}

View file

@ -25,7 +25,7 @@ use util::LOGGER;
use time;
use peer::Peer;
use store::{PeerData, PeerStore, State};
use store::{PeerStore, PeerData, State};
use types::*;
#[derive(Clone)]

View file

@ -15,340 +15,158 @@
use std::sync::Arc;
use std::net::SocketAddr;
use futures::Future;
use futures::sync::mpsc::UnboundedSender;
use futures_cpupool::CpuPool;
use rand;
use rand::Rng;
use tokio_core::net::TcpStream;
use core::core;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use core::ser;
use conn::TimeoutConnection;
use conn::*;
use msg::*;
use rand;
use rand::Rng;
use types::*;
use util::LOGGER;
use util::OneTime;
#[allow(dead_code)]
pub struct ProtocolV1 {
conn: OneTime<TimeoutConnection>,
}
impl ProtocolV1 {
pub fn new() -> ProtocolV1 {
ProtocolV1 {
conn: OneTime::new(),
}
}
}
impl Protocol for ProtocolV1 {
/// Sets up the protocol reading, writing and closing logic.
fn handle(
&self,
conn: TcpStream,
adapter: Arc<NetAdapter>,
addr: SocketAddr,
pool: CpuPool,
) -> Box<Future<Item = (), Error = Error>> {
let (conn, listener) = TimeoutConnection::listen(conn, pool, move |sender, header, data| {
let adapt = adapter.as_ref();
handle_payload(adapt, sender, header, data, addr)
});
self.conn.init(conn);
listener
}
/// Bytes sent and received.
fn transmitted_bytes(&self) -> (u64, u64) {
self.conn.borrow().transmitted_bytes()
}
/// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol.
fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
self.send_request(
Type::Ping,
Type::Pong,
&Ping { total_difficulty, height },
None,
)
}
/// Serializes and sends a block to our remote peer
fn send_block(&self, b: &core::Block) -> Result<(), Error> {
self.send_msg(Type::Block, b)
}
fn send_compact_block(&self, cb: &core::CompactBlock) -> Result<(), Error> {
self.send_msg(Type::CompactBlock, cb)
}
/// Serializes and sends a block header to our remote peer ("header first" propagation)
fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error> {
self.send_msg(Type::Header, bh)
}
/// Serializes and sends a transaction to our remote peer
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
self.send_msg(Type::Transaction, tx)
}
fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.send_request(
Type::GetHeaders,
Type::Headers,
&Locator { hashes: locator },
None,
)
}
fn send_block_request(&self, h: Hash) -> Result<(), Error> {
self.send_request(Type::GetBlock, Type::Block, &h, Some(h))
}
fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
self.send_request(Type::GetCompactBlock, Type::CompactBlock, &h, Some(h))
}
fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
self.send_request(
Type::GetPeerAddrs,
Type::PeerAddrs,
&GetPeerAddrs {
capabilities: capab,
},
None,
)
}
/// Close the connection to the remote peer
fn close(&self) {
// TODO some kind of shutdown signal
}
}
impl ProtocolV1 {
fn send_msg<W: ser::Writeable>(&self, t: Type, body: &W) -> Result<(), Error> {
self.conn.borrow().send_msg(t, body)
}
fn send_request<W: ser::Writeable>(
&self,
t: Type,
rt: Type,
body: &W,
expect_resp: Option<Hash>,
) -> Result<(), Error> {
if self.conn.is_initialized() {
self.conn.borrow().send_request(t, rt, body, expect_resp)
} else {
Ok(())
}
}
}
fn handle_payload(
adapter: &NetAdapter,
sender: UnboundedSender<Vec<u8>>,
header: MsgHeader,
buf: Vec<u8>,
pub struct Protocol {
adapter: Arc<NetAdapter>,
addr: SocketAddr,
) -> Result<Option<Hash>, ser::Error> {
match header.msg_type {
Type::Ping => {
let ping = ser::deserialize::<Ping>(&mut &buf[..])?;
adapter.peer_difficulty(addr, ping.total_difficulty, ping.height);
let pong = Pong { total_difficulty: adapter.total_difficulty(), height: adapter.total_height() };
let mut body_data = vec![];
try!(ser::serialize(&mut body_data, &pong));
let mut data = vec![];
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::Pong, body_data.len() as u64),
));
data.append(&mut body_data);
}
if let Err(e) = sender.unbounded_send(data) {
debug!(LOGGER, "handle_payload: Ping, error sending: {:?}", e);
impl Protocol {
pub fn new(adapter: Arc<NetAdapter>, addr: SocketAddr) -> Protocol {
Protocol{adapter, addr}
}
}
impl MessageHandler for Protocol {
fn consume(&self, msg: &mut Message) -> Result<Option<(Vec<u8>, Type)>, Error> {
let adapter = &self.adapter;
match msg.header.msg_type {
Type::Ping => {
let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height);
let pong_bytes = ser::ser_vec(
&Pong {
total_difficulty: adapter.total_difficulty(),
height: adapter.total_height(),
}).unwrap();
Ok(Some((pong_bytes, Type::Pong)))
}
Ok(None)
}
Type::Pong => {
let pong = ser::deserialize::<Pong>(&mut &buf[..])?;
adapter.peer_difficulty(addr, pong.total_difficulty, pong.height);
Ok(None)
},
Type::Transaction => {
let tx = ser::deserialize::<core::Transaction>(&mut &buf[..])?;
debug!(LOGGER, "handle_payload: Transaction: {}", tx.hash());
Type::Pong => {
let pong: Pong = msg.body()?;
adapter.peer_difficulty(self.addr, pong.total_difficulty, pong.height);
Ok(None)
},
adapter.transaction_received(tx);
Ok(None)
}
Type::GetBlock => {
let h = ser::deserialize::<Hash>(&mut &buf[..])?;
debug!(LOGGER, "handle_payload: GetBlock: {}", h);
Type::Transaction => {
let tx: core::Transaction = msg.body()?;
adapter.transaction_received(tx);
Ok(None)
}
let bo = adapter.get_block(h);
if let Some(b) = bo {
// serialize and send the block over
let mut body_data = vec![];
try!(ser::serialize(&mut body_data, &b));
let mut data = vec![];
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::Block, body_data.len() as u64),
));
data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) {
debug!(LOGGER, "handle_payload: GetBlock, error sending: {:?}", e);
Type::GetBlock => {
let h: Hash = msg.body()?;
debug!(LOGGER, "handle_payload: GetBlock {}", h);
let bo = adapter.get_block(h);
if let Some(b) = bo {
let block_bytes = ser::ser_vec(&b).unwrap();
return Ok(Some((block_bytes, Type::Block)));
}
Ok(None)
}
Ok(None)
}
Type::Block => {
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
let bh = b.hash();
debug!(LOGGER, "handle_payload: Block: {}", bh);
adapter.block_received(b, addr);
Ok(Some(bh))
}
Type::GetCompactBlock => {
let h = ser::deserialize::<Hash>(&mut &buf[..])?;
debug!(LOGGER, "handle_payload: GetCompactBlock: {}", h);
Type::Block => {
let b: core::Block = msg.body()?;
let bh = b.hash();
if let Some(b) = adapter.get_block(h) {
let cb = b.as_compact_block();
debug!(LOGGER, "handle_payload: Block {}", bh);
// serialize and send the block over in compact representation
let mut body_data = vec![];
let mut data = vec![];
adapter.block_received(b, self.addr);
Ok(None)
}
// if we have txs in the block send a compact block
// but if block is empty -
// to allow us to test all code paths, randomly choose to send
// either the block or the compact block
let mut rng = rand::thread_rng();
Type::GetCompactBlock => {
let h: Hash = msg.body()?;
debug!(LOGGER, "handle_payload: GetCompactBlock: {}", h);
if cb.kern_ids.is_empty() && rng.gen() {
debug!(
LOGGER,
"handle_payload: GetCompactBlock: empty block, sending full block",
);
if let Some(b) = adapter.get_block(h) {
let cb = b.as_compact_block();
try!(ser::serialize(&mut body_data, &b));
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::Block, body_data.len() as u64),
));
// serialize and send the block over in compact representation
// if we have txs in the block send a compact block
// but if block is empty -
// to allow us to test all code paths, randomly choose to send
// either the block or the compact block
let mut rng = rand::thread_rng();
if cb.kern_ids.is_empty() && rng.gen() {
debug!(
LOGGER,
"handle_payload: GetCompactBlock: empty block, sending full block",
);
let block_bytes = ser::ser_vec(&b).unwrap();
Ok(Some((block_bytes, Type::Block)))
} else {
let compact_block_bytes = ser::ser_vec(&cb).unwrap();
Ok(Some((compact_block_bytes, Type::CompactBlock)))
}
} else {
try!(ser::serialize(&mut body_data, &cb));
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::CompactBlock, body_data.len() as u64),
));
}
data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) {
debug!(LOGGER, "handle_payload: GetCompactBlock, error sending: {:?}", e);
Ok(None)
}
}
Ok(None)
}
Type::CompactBlock => {
let b = ser::deserialize::<core::CompactBlock>(&mut &buf[..])?;
let bh = b.hash();
debug!(LOGGER, "handle_payload: CompactBlock: {}", bh);
adapter.compact_block_received(b, addr);
Ok(Some(bh))
}
// A peer is asking us for some headers via a locator
Type::GetHeaders => {
let loc = ser::deserialize::<Locator>(&mut &buf[..])?;
debug!(LOGGER, "handle_payload: GetHeaders: {:?}", loc);
Type::CompactBlock => {
let b: core::CompactBlock = msg.body()?;
let bh = b.hash();
debug!(LOGGER, "handle_payload: CompactBlock: {}", bh);
let headers = adapter.locate_headers(loc.hashes);
// serialize and send all the headers over
let mut body_data = vec![];
try!(ser::serialize(
&mut body_data,
&Headers { headers: headers },
));
let mut data = vec![];
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::Headers, body_data.len() as u64),
));
data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) {
debug!(LOGGER, "handle_payload: GetHeaders, error sending: {:?}", e);
adapter.compact_block_received(b, self.addr);
Ok(None)
}
Ok(None)
}
// "header first" block propagation - if we have not yet seen this block
// we can go request it from some of our peers
Type::Header => {
let header = ser::deserialize::<core::BlockHeader>(&mut &buf[..])?;
debug!(LOGGER, "handle_payload: Header: {}", header.hash());
Type::GetHeaders => {
// load headers from the locator
let loc: Locator = msg.body()?;
let headers = adapter.locate_headers(loc.hashes);
adapter.header_received(header, addr);
// we do not return a hash here as we never request a single header
// a header will always arrive unsolicited
Ok(None)
}
// receive headers as part of the sync process
Type::Headers => {
let headers = ser::deserialize::<Headers>(&mut &buf[..])?;
debug!(LOGGER, "handle_payload: Headers: {}", headers.headers.len());
adapter.headers_received(headers.headers, addr);
Ok(None)
}
Type::GetPeerAddrs => {
let get_peers = ser::deserialize::<GetPeerAddrs>(&mut &buf[..])?;
let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities);
// serialize and send all the headers over
let mut body_data = vec![];
try!(ser::serialize(
&mut body_data,
&PeerAddrs {
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
},
));
let mut data = vec![];
try!(ser::serialize(
&mut data,
&MsgHeader::new(Type::PeerAddrs, body_data.len() as u64),
));
data.append(&mut body_data);
if let Err(e) = sender.unbounded_send(data) {
debug!(LOGGER, "handle_payload: GetPeerAddrs, error sending: {:?}", e);
// serialize and send all the headers over
let header_bytes = ser::ser_vec(&Headers { headers: headers }).unwrap();
return Ok(Some((header_bytes, Type::Headers)));
}
Ok(None)
}
Type::PeerAddrs => {
let peer_addrs = ser::deserialize::<PeerAddrs>(&mut &buf[..])?;
adapter.peer_addrs_received(peer_addrs.peers.iter().map(|pa| pa.0).collect());
Ok(None)
}
_ => {
debug!(LOGGER, "unknown message type {:?}", header.msg_type);
Ok(None)
Type::Headers => {
let headers: Headers = msg.body()?;
adapter.headers_received(headers.headers, self.addr);
Ok(None)
}
Type::GetPeerAddrs => {
let get_peers: GetPeerAddrs = msg.body()?;
let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities);
let peer_addrs_bytes = ser::ser_vec(
&PeerAddrs {
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
}).unwrap();
return Ok(Some((peer_addrs_bytes, Type::PeerAddrs)));
}
Type::PeerAddrs => {
let peer_addrs: PeerAddrs = msg.body()?;
adapter.peer_addrs_received(peer_addrs.peers.iter().map(|pa| pa.0).collect());
Ok(None)
}
_ => {
debug!(LOGGER, "unknown message type {:?}", msg.header.msg_type);
Ok(None)
}
}
}
}

View file

@ -1,224 +0,0 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Provides wrappers for throttling readers and writers
use std::time::Instant;
use std::{cmp, io};
use futures::*;
use tokio_io::*;
/// A Rate Limited Reader
#[derive(Debug)]
pub struct ThrottledReader<R: AsyncRead> {
reader: R,
/// Max Bytes per second
max: u32,
/// Stores a count of last request and last request time
allowed: usize,
last_check: Instant,
}
#[allow(dead_code)]
impl<R: AsyncRead> ThrottledReader<R> {
/// Adds throttling to a reader.
/// The resulting reader will read at most `max` amount of bytes per second
pub fn new(reader: R, max: u32) -> Self {
ThrottledReader {
reader: reader,
max: max,
allowed: max as usize,
last_check: Instant::now(),
}
}
/// Get a shared reference to the inner sink.
pub fn get_ref(&self) -> &R {
&self.reader
}
/// Get a mutable reference to the inner sink.
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
/// Consumes this combinator, returning the underlying sink.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> R {
self.reader
}
}
impl<R: AsyncRead> io::Read for ThrottledReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// Check passed Time
let time_passed = self.last_check.elapsed();
self.last_check = Instant::now();
self.allowed += time_passed.as_secs() as usize * self.max as usize;
// Throttle
if self.allowed > self.max as usize {
self.allowed = self.max as usize;
}
// Check if Allowed
if self.allowed < 1 {
return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Reached Allowed Read Limit",
));
}
// Read Max Allowed
let buf = if buf.len() > self.allowed {
&mut buf[0..self.allowed]
} else {
buf
};
let res = self.reader.read(buf);
// Decrement Allowed amount written
if let Ok(n) = res {
self.allowed -= n;
}
res
}
}
impl<R: AsyncRead> AsyncRead for ThrottledReader<R> {}
/// A Rate Limited Writer
#[derive(Debug)]
pub struct ThrottledWriter<W: AsyncWrite> {
writer: W,
/// Max Bytes per second
max: u64,
/// Stores a count of last request and last request time
allowed: u64,
last_check: Instant,
}
#[allow(dead_code)]
impl<W: AsyncWrite> ThrottledWriter<W> {
/// Adds throttling to a writer.
/// The resulting writer will write at most `max` amount of bytes per second
pub fn new(writer: W, max: u64) -> Self {
ThrottledWriter {
writer: writer,
max: max,
allowed: max,
last_check: Instant::now(),
}
}
/// Get a shared reference to the inner sink.
pub fn get_ref(&self) -> &W {
&self.writer
}
/// Get a mutable reference to the inner sink.
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
/// Consumes this combinator, returning the underlying sink.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> W {
self.writer
}
}
impl<W: AsyncWrite> io::Write for ThrottledWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// Check passed Time
let time_passed = self.last_check.elapsed();
self.last_check = Instant::now();
self.allowed += time_passed.as_secs() * self.max;
// Throttle
if self.allowed > self.max {
self.allowed = cmp::min(self.max, u32::max_value() as u64);
}
// Check if Allowed
if self.allowed < 1 {
return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Reached Allowed Write Limit",
));
}
// Write max allowed
let buf = if buf.len() > self.allowed as usize {
&buf[0..self.allowed as usize]
} else {
buf
};
let res = self.writer.write(buf);
// Decrement Allowed amount written
if let Ok(n) = res {
self.allowed -= n as u64;
}
res
}
fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}
impl<T: AsyncWrite> AsyncWrite for ThrottledWriter<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.writer.shutdown()
}
}
#[cfg(test)]
mod test {
use super::*;
use std::io::Cursor;
#[test]
fn should_throttle_write() {
let buf = vec![0; 64];
let mut t_buf = ThrottledWriter::new(Cursor::new(buf), 8);
for _ in 0..16 {
let _ = t_buf.write_buf(&mut Cursor::new(vec![1; 8]));
}
let cursor = t_buf.into_inner();
assert_eq!(cursor.position(), 8);
}
#[test]
fn should_throttle_read() {
let buf = vec![1; 64];
let mut t_buf = ThrottledReader::new(Cursor::new(buf), 8);
let mut dst = Cursor::new(vec![0; 64]);
for _ in 0..16 {
let _ = t_buf.read_buf(&mut dst);
}
assert_eq!(dst.position(), 8);
}
}

190
p2p/src/serv.rs Normal file
View file

@ -0,0 +1,190 @@
// Copyright 2016-2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
use std::time::Duration;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use handshake::Handshake;
use peer::Peer;
use peers::Peers;
use store::PeerStore;
use types::*;
use util::LOGGER;
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
handshake: Arc<Handshake>,
pub peers: Peers,
}
unsafe impl Sync for Server {}
unsafe impl Send for Server {}
// TODO TLS
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
db_root: String,
capab: Capabilities,
config: P2PConfig,
adapter: Arc<ChainAdapter>,
genesis: Hash,
) -> Result<Server, Error> {
Ok(Server {
config: config.clone(),
capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())),
peers: Peers::new(PeerStore::new(db_root)?, adapter, config),
})
}
pub fn listen(&self) -> Result<(), Error> {
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = TcpListener::bind(addr)?;
for stream in listener.incoming() {
match stream {
Ok(stream) => {
if !self.check_banned(&stream) {
let peer_addr = stream.peer_addr();
if let Err(e) = self.handle_new_peer(stream) {
debug!(
LOGGER,
"Error accepting peer {}: {:?}",
peer_addr.map(|a| a.to_string()).unwrap_or("?".to_owned()),
e);
}
}
}
Err(e) => {
warn!(LOGGER, "Couldn't establish new client connection: {:?}", e);
}
}
}
Ok(())
}
/// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address.
pub fn connect(&self, addr: &SocketAddr) -> Result<Arc<RwLock<Peer>>, Error> {
if Peer::is_denied(&self.config, &addr) {
debug!(LOGGER, "Peer {} denied, not connecting.", addr);
return Err(Error::ConnectionClose);
}
if let Some(p) = self.peers.get_connected_peer(addr) {
// if we're already connected to the addr, just return the peer
debug!(LOGGER, "connect_peer: already connected {}", addr);
return Ok(p);
}
debug!(LOGGER, "connect_peer: connecting to {}", addr);
match TcpStream::connect_timeout(addr, Duration::from_secs(10)) {
Ok(mut stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty();
let peer = Peer::connect(
&mut stream,
self.capabilities,
total_diff,
addr,
&self.handshake,
Arc::new(self.peers.clone()),
)?;
let added = self.peers.add_connected(peer);
{
let mut peer = added.write().unwrap();
peer.start(stream);
}
Ok(added)
}
Err(e) => {
debug!(LOGGER, "Couldn not connect to {}: {:?}", addr, e);
Err(Error::Connection(e))
}
}
}
fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty();
// accept the peer and add it to the server map
let peer = Peer::accept(
&mut stream,
self.capabilities,
total_diff,
&self.handshake,
Arc::new(self.peers.clone()),
)?;
let added = self.peers.add_connected(peer);
let mut peer = added.write().unwrap();
peer.start(stream);
Ok(())
}
fn check_banned(&self, stream: &TcpStream) -> bool {
// peer has been banned, go away!
if let Ok(peer_addr) = stream.peer_addr() {
if self.peers.is_banned(peer_addr) {
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!(LOGGER, "Error shutting down conn: {:?}", e);
}
return true;
}
}
false
}
}
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Difficulty {
Difficulty::one()
}
fn total_height(&self) -> u64 {
0
}
fn transaction_received(&self, _: core::Transaction) {}
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { true }
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true }
fn block_received(&self, _: core::Block, _: SocketAddr) -> bool { true }
fn headers_received(&self, _: Vec<core::BlockHeader>, _:SocketAddr) {}
fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> {
vec![]
}
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
}
impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
vec![]
}
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {}
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {}
}

View file

@ -1,320 +0,0 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Grin server implementation, accepts incoming connections and connects to
//! other peers in the network.
use std::cell::RefCell;
use std::net::{SocketAddr, Shutdown};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use futures;
use futures::{Future, Stream};
use futures::future::{self, IntoFuture};
use futures_cpupool::CpuPool;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
use tokio_timer::Timer;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use handshake::Handshake;
use peer::Peer;
use peers::Peers;
use store::PeerStore;
use types::*;
use util::LOGGER;
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl ChainAdapter for DummyAdapter {
fn total_difficulty(&self) -> Difficulty {
Difficulty::one()
}
fn total_height(&self) -> u64 {
0
}
fn transaction_received(&self, _tx: core::Transaction) {}
fn block_received(&self, _b: core::Block, _addr: SocketAddr) -> bool { true }
fn compact_block_received(&self, _cb: core::CompactBlock, _addr: SocketAddr) -> bool { true }
fn header_received(&self, _bh: core::BlockHeader, _addr: SocketAddr) -> bool { true }
fn headers_received(&self, _bh: Vec<core::BlockHeader>, _addr:SocketAddr) {}
fn locate_headers(&self, _loc: Vec<Hash>) -> Vec<core::BlockHeader> {
vec![]
}
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
}
impl NetAdapter for DummyAdapter {
fn find_peer_addrs(&self, _: Capabilities) -> Vec<SocketAddr> {
vec![]
}
fn peer_addrs_received(&self, _: Vec<SocketAddr>) {}
fn peer_difficulty(&self, _: SocketAddr, _: Difficulty, _:u64) {}
}
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
handshake: Arc<Handshake>,
pub peers: Peers,
pool: CpuPool,
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
}
unsafe impl Sync for Server {}
unsafe impl Send for Server {}
// TODO TLS
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
db_root: String,
capab: Capabilities,
config: P2PConfig,
adapter: Arc<ChainAdapter>,
genesis: Hash,
pool: CpuPool,
) -> Result<Server, Error> {
Ok(Server {
config: config.clone(),
capabilities: capab,
handshake: Arc::new(Handshake::new(genesis, config.clone())),
peers: Peers::new(PeerStore::new(db_root)?, adapter, config.clone()),
pool: pool,
stop: RefCell::new(None),
})
}
/// Starts the p2p server. Opens a TCP port to allow incoming
/// connections and starts the bootstrapping process to find peers.
pub fn start(&self, h: reactor::Handle) -> Box<Future<Item = (), Error = Error>> {
let addr = SocketAddr::new(self.config.host, self.config.port);
let socket = TcpListener::bind(&addr, &h.clone()).unwrap();
warn!(LOGGER, "P2P server started on {}", addr);
let handshake = self.handshake.clone();
let peers = self.peers.clone();
let capab = self.capabilities.clone();
let pool = self.pool.clone();
// main peer acceptance future handling handshake
let hp = h.clone();
let peers_listen = socket.incoming().map_err(From::from).map(move |(conn, _)| {
// aaaand.. reclone for the internal closures
let peers = peers.clone();
let peers2 = peers.clone();
let handshake = handshake.clone();
let hp = hp.clone();
let pool = pool.clone();
future::ok(conn).and_then(move |conn| {
// Refuse connection from banned peers
if let Ok(peer_addr) = conn.peer_addr() {
if peers.is_banned(peer_addr) {
debug!(LOGGER, "Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = conn.shutdown(Shutdown::Both) {
debug!(LOGGER, "Error shutting down conn: {:?}", e);
}
return Err(Error::Banned)
}
}
Ok(conn)
}).and_then(move |conn| {
let total_diff = peers2.total_difficulty();
// accept the peer and add it to the server map
let accept = Peer::accept(
conn,
capab,
total_diff,
&handshake.clone(),
Arc::new(peers2.clone()),
);
let added = add_to_peers(peers2, accept);
// wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(added), &hp);
// run the main peer protocol
timed_peer.and_then(move |(conn, peer)| {
let peer = peer.read().unwrap();
peer.run(conn, pool)
})
})
});
// spawn each peer future to its own task
let hs = h.clone();
let server = peers_listen.for_each(move |peer| {
hs.spawn(peer.then(|res| {
match res {
Err(e) => info!(LOGGER, "Client error: {:?}", e),
_ => {}
}
futures::finished(())
}));
Ok(())
});
// setup the stopping oneshot on the server and join it with the peer future
let (stop, stop_rx) = futures::sync::oneshot::channel();
{
let mut stop_mut = self.stop.borrow_mut();
*stop_mut = Some(stop);
}
// timer to regularly check on our peers by pinging them
let peers_inner = self.peers.clone();
let peers_timer = Timer::default()
.interval(Duration::new(20, 0))
.fold((), move |_, _| {
let total_diff = peers_inner.total_difficulty();
let total_height = peers_inner.total_height();
peers_inner.check_all(total_diff, total_height);
Ok(())
});
Box::new(
server
.select(stop_rx.map_err(|_| Error::ConnectionClose))
.then(|res| match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
})
.select(peers_timer.map_err(|_| Error::Timeout))
.then(|res| match res {
Ok((_, _)) => Ok(()),
Err((e, _)) => Err(e),
}),
)
}
/// Asks the server to connect to a new peer.
pub fn connect_peer(
&self,
addr: SocketAddr,
h: reactor::Handle,
) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> {
if Peer::is_denied(self.config.clone(), addr) {
debug!(LOGGER, "Peer {} denied, not connecting.", addr);
return Box::new(future::err(Error::ConnectionClose));
}
if let Some(p) = self.peers.get_connected_peer(&addr) {
// if we're already connected to the addr, just return the peer
debug!(LOGGER, "connect_peer: already connected {}", addr);
return Box::new(future::ok(Some(p)));
}
debug!(LOGGER, "connect_peer: connecting to {}", addr);
// cloneapalooza
let peers = self.peers.clone();
let handshake = self.handshake.clone();
let capab = self.capabilities.clone();
let pool = self.pool.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port);
let timer = Timer::default();
let socket_connect = timer.timeout(
TcpStream::connect(&addr, &h),
Duration::from_secs(5),
).map_err(|e| {
debug!(LOGGER, "connect_peer: socket connect error - {:?}", e);
Error::Connection(e)
});
let h2 = h.clone();
let request = socket_connect
.and_then(move |socket| {
let total_diff = peers.total_difficulty();
// connect to the peer and add it to the server map, wiring it a timeout for
// the handshake
let connect = Peer::connect(
socket,
capab,
total_diff,
self_addr,
handshake.clone(),
Arc::new(peers.clone()),
);
let added = add_to_peers(peers, connect);
with_timeout(Box::new(added), &h)
})
.and_then(move |(socket, peer)| {
let peer_inner = peer.read().unwrap();
h2.spawn(peer_inner.run(socket, pool).map_err(|e| {
error!(LOGGER, "Peer error: {:?}", e);
()
}));
Ok(Some(peer.clone()))
});
Box::new(request)
}
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(self) {
info!(LOGGER, "calling stop on server");
self.peers.stop();
self.stop.into_inner().unwrap().send(()).unwrap();
}
}
// Adds the peer built by the provided future in the peers map
fn add_to_peers<A>(peers: Peers, peer_fut: A)
-> Box<Future<Item = Result<(TcpStream, Arc<RwLock<Peer>>), ()>, Error = Error>>
where A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static {
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
let apeer = peers.add_connected(peer);
Ok((conn, apeer))
});
Box::new(peer_add)
}
// Adds a timeout to a future
fn with_timeout<T: 'static>(
fut: Box<Future<Item = Result<T, ()>, Error = Error>>,
h: &reactor::Handle,
) -> Box<Future<Item = T, Error = Error>> {
let timeout = reactor::Timeout::new(Duration::from_secs(5), h).unwrap();
let timed = fut.select(timeout.map(Err).from_err())
.then(|res| match res {
Ok((Ok(inner), _timeout)) => {
Ok(inner)
},
Ok((Err(inner), _accept)) => {
debug!(LOGGER, "with_timeout: ok, timeout. nested={:?}", inner);
Err(Error::Timeout)
},
Err((e, _other)) => {
debug!(LOGGER, "with_timeout: err. {:?}", e);
Err(e)
},
});
Box::new(timed)
}

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2016-2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -15,12 +15,7 @@
use std::convert::From;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use futures::Future;
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream;
use tokio_timer::TimerError;
use std::sync::mpsc;
use core::core;
use core::core::hash::Hash;
@ -42,6 +37,8 @@ pub const MAX_PEER_ADDRS: u32 = 256;
pub enum Error {
Serialization(ser::Error),
Connection(io::Error),
/// Header type does not match the expected message type
BadMessage,
Banned,
ConnectionClose,
Timeout,
@ -72,11 +69,16 @@ impl From<io::Error> for Error {
Error::Connection(e)
}
}
impl From<TimerError> for Error {
fn from(_: TimerError) -> Error {
Error::Timeout
impl<T> From<mpsc::SendError<T>> for Error {
fn from(_e: mpsc::SendError<T>) -> Error {
Error::ConnectionClose
}
}
// impl From<TimerError> for Error {
// fn from(_: TimerError) -> Error {
// Error::Timeout
// }
// }
/// Configuration for the peer-to-peer server.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -130,49 +132,6 @@ pub struct PeerInfo {
pub total_difficulty: Difficulty,
}
/// A given communication protocol agreed upon between 2 peers (usually
/// ourselves and a remote) after handshake. This trait is necessary to allow
/// protocol negotiation as it gets upgraded to multiple versions.
pub trait Protocol {
/// Starts handling protocol communication, the connection) is expected to
/// be known already, usually passed during construction. Will typically
/// block so needs to be called withing a coroutine. Should also be called
/// only once.
fn handle(&self, conn: TcpStream, na: Arc<NetAdapter>, addr: SocketAddr, pool: CpuPool)
-> Box<Future<Item = (), Error = Error>>;
/// Sends a ping message to the remote peer.
fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error>;
/// Relays a block to the remote peer.
fn send_block(&self, b: &core::Block) -> Result<(), Error>;
fn send_compact_block(&self, cb: &core::CompactBlock) -> Result<(), Error>;
/// Relays a block header to the remote peer ("header first" propagation).
fn send_header(&self, bh: &core::BlockHeader) -> Result<(), Error>;
/// Relays a transaction to the remote peer.
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>;
/// Sends a request for block headers based on the provided block locator.
fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error>;
/// Sends a request for a block from its hash.
fn send_block_request(&self, h: Hash) -> Result<(), Error>;
fn send_compact_block_request(&self, h: Hash) -> Result<(), Error>;
/// Sends a request for some peer addresses.
fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error>;
/// How many bytes have been sent/received to/from the remote peer.
fn transmitted_bytes(&self) -> (u64, u64);
/// Close the connection to the remote peer.
fn close(&self);
}
/// Bridge between the networking layer and the rest of the system. Handles the
/// forwarding or querying of blocks and transactions from the network among
/// other things.

View file

@ -12,22 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate futures;
extern crate futures_cpupool;
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate tokio_core;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use std::time;
use futures::future::Future;
use futures_cpupool::CpuPool;
use tokio_core::net::TcpStream;
use tokio_core::reactor::{self, Core};
use core::core::target::Difficulty;
use core::core::hash::Hash;
use p2p::Peer;
@ -44,8 +36,6 @@ fn open_port() -> u16 {
// followed by a ping/pong exchange to make sure the connection is live.
#[test]
fn peer_handshake() {
let mut evtlp = Core::new().unwrap();
let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig {
host: "0.0.0.0".parse().unwrap(),
port: open_port(),
@ -53,69 +43,42 @@ fn peer_handshake() {
peers_deny: None,
};
let net_adapter = Arc::new(p2p::DummyAdapter {});
let pool = CpuPool::new(1);
let server = p2p::Server::new(
let server = Arc::new(p2p::Server::new(
".grin".to_owned(),
p2p::Capabilities::UNKNOWN,
p2p_conf.clone(),
net_adapter.clone(),
Hash::from_vec(vec![]),
pool.clone(),
).unwrap();
let run_server = server.start(handle.clone());
).unwrap());
let p2p_inner = server.clone();
let _ = thread::spawn(move || {
p2p_inner.listen()
});
thread::sleep(time::Duration::from_secs(1));
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();
let my_addr = "127.0.0.1:5000".parse().unwrap();
let mut peer = Peer::connect(
&mut socket,
p2p::Capabilities::UNKNOWN,
Difficulty::one(),
my_addr,
&p2p::handshake::Handshake::new(Hash::from_vec(vec![]), p2p_conf.clone()),
net_adapter,
).unwrap();
let phandle = handle.clone();
let rhandle = handle.clone();
let timeout = reactor::Timeout::new(time::Duration::new(1, 0), &handle).unwrap();
let timeout_send = reactor::Timeout::new(time::Duration::new(2, 0), &handle).unwrap();
handle.spawn(
timeout
.from_err()
.and_then(move |_| {
let conf = p2p_conf.clone();
let addr = SocketAddr::new(conf.host, conf.port);
let socket =
TcpStream::connect(&addr, &phandle).map_err(|e| p2p::Error::Connection(e));
socket
.and_then(move |socket| {
Peer::connect(
socket,
p2p::Capabilities::UNKNOWN,
Difficulty::one(),
my_addr,
Arc::new(
p2p::handshake::Handshake::new(
Hash::from_vec(vec![]),
p2p_conf.clone(),
),
),
net_adapter.clone(),
)
})
.and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, pool).map_err(|e| {
panic!("Client run failed: {:?}", e);
}));
peer.send_ping(Difficulty::one(), 0).unwrap();
timeout_send.from_err().map(|_| peer)
})
.and_then(|peer| {
let (sent, recv) = peer.transmitted_bytes();
assert!(sent > 0);
assert!(recv > 0);
Ok(())
})
.and_then(|_| {
assert!(server.peers.peer_count() > 0);
server.stop();
Ok(())
})
})
.map_err(|e| {
panic!("Client connection failed: {:?}", e);
}),
);
peer.start(socket);
thread::sleep(time::Duration::from_secs(1));
evtlp.run(run_server).unwrap();
peer.send_ping(Difficulty::one(), 0).unwrap();
thread::sleep(time::Duration::from_secs(1));
let server_peer = server.peers.get_connected_peer(&my_addr).unwrap();
let server_peer = server_peer.read().unwrap();
assert_eq!(server_peer.info.total_difficulty, Difficulty::one());
assert!(server.peers.peer_count() > 0);
}