Discovery and seeding of other peers. Relies either on a gist with IP addresses or a static list. Connects to a first list, sending peer request messages to discover more. Monitor the server to make sure there's always enough peer connections.

This commit is contained in:
Ignotus Peverell 2017-02-18 18:42:34 -08:00
parent 510feadce6
commit eb024e91d2
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
16 changed files with 554 additions and 90 deletions

View file

@ -13,7 +13,10 @@ secp256k1zkp = { path = "../secp256k1zkp" }
env_logger="^0.3.5"
futures = "^0.1.9"
futures-cpupool = "^0.1.3"
hyper = { git = "https://github.com/hyperium/hyper" }
log = "^0.3"
time = "^0.1"
tokio-core="^0.1.1"
tokio-timer="^0.1.0"
rand = "^0.3"

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::thread;
@ -20,7 +21,7 @@ use chain::{self, ChainAdapter};
use core::core;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use p2p::{self, NetAdapter, Server};
use p2p::{self, NetAdapter, Server, PeerStore, PeerData, Capabilities, State};
use util::OneTime;
use store;
use sync;
@ -33,6 +34,7 @@ pub struct NetToChainAdapter {
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>,
peer_store: Arc<PeerStore>,
syncer: OneTime<Arc<sync::Syncer>>,
}
@ -149,6 +151,7 @@ impl NetAdapter for NetToChainAdapter {
headers
}
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block> {
let store = self.chain_store.clone();
let b = store.get_block(&h);
@ -157,17 +160,62 @@ impl NetAdapter for NetToChainAdapter {
_ => None,
}
}
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> {
let peers = self.peer_store.find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize);
debug!("Got {} peer addrs to send.", peers.len());
map_vec!(peers, |p| p.addr)
}
/// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {
debug!("Received {} peer addrs, saving.", peer_addrs.len());
for pa in peer_addrs {
if let Ok(e) = self.peer_store.exists_peer(pa) {
if e {
continue;
}
}
let peer = PeerData {
addr: pa,
capabilities: p2p::UNKNOWN,
user_agent: "".to_string(),
flags: State::Healthy,
};
if let Err(e) = self.peer_store.save_peer(&peer) {
error!("Could not save received peer address: {:?}", e);
}
}
}
/// Network successfully connected to a peer.
fn peer_connected(&self, pi: &p2p::PeerInfo) {
debug!("Saving newly connected peer {}.", pi.addr);
let peer = PeerData {
addr: pi.addr,
capabilities: pi.capabilities,
user_agent: pi.user_agent.clone(),
flags: State::Healthy,
};
if let Err(e) = self.peer_store.save_peer(&peer) {
error!("Could not save connected peer: {:?}", e);
}
}
}
impl NetToChainAdapter {
pub fn new(chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>)
chain_adapter: Arc<ChainToNetAdapter>,
peer_store: Arc<PeerStore>)
-> NetToChainAdapter {
NetToChainAdapter {
chain_head: chain_head,
chain_store: chain_store,
chain_adapter: chain_adapter,
peer_store: peer_store,
syncer: OneTime::new(),
}
}

View file

@ -25,11 +25,15 @@
extern crate log;
extern crate env_logger;
extern crate futures;
extern crate futures_cpupool as cpupool;
extern crate hyper;
extern crate rand;
extern crate time;
extern crate tokio_core;
extern crate tokio_timer;
extern crate grin_chain as chain;
#[macro_use]
extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate grin_store as store;
@ -39,6 +43,7 @@ extern crate secp256k1zkp as secp;
mod adapters;
mod miner;
mod server;
mod seed;
mod sync;
pub use server::{Server, ServerConfig};
pub use server::{Server, ServerConfig, Seeding};

220
grin/src/seed.rs Normal file
View file

@ -0,0 +1,220 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rand::{thread_rng, Rng};
use std::cmp::min;
use std::net::SocketAddr;
use std::ops::Deref;
use std::str;
use std::sync::Arc;
use std::thread;
use std::time;
use cpupool;
use futures::{self, future, Future, Stream};
use futures::sync::mpsc;
use hyper;
use tokio_core::reactor;
use tokio_timer::Timer;
use p2p;
const PEER_MAX_COUNT: u32 = 25;
const PEER_PREFERRED_COUNT: u32 = 8;
const GIST_SEEDS_URL: &'static str = "";
pub struct Seeder {
peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>,
capabilities: p2p::Capabilities,
}
impl Seeder {
pub fn new(capabilities: p2p::Capabilities,
peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>)
-> Seeder {
Seeder {
peer_store: peer_store,
p2p: p2p,
capabilities: capabilities,
}
}
pub fn connect_and_monitor(&self,
h: reactor::Handle,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>) {
// open a channel with a listener that connects every peer address sent below
// max peer count
let (tx, rx) = futures::sync::mpsc::unbounded();
h.spawn(self.listen_for_addrs(h.clone(), rx));
// check seeds and start monitoring connections
let seeder = self.connect_to_seeds(tx.clone(), seed_list)
.join(self.monitor_peers(tx.clone()));
h.spawn(seeder.map(|_| ()).map_err(|_| ()));
}
fn monitor_peers(&self,
tx: mpsc::UnboundedSender<SocketAddr>)
-> Box<Future<Item = (), Error = String>> {
let peer_store = self.peer_store.clone();
let p2p_server = self.p2p.clone();
// now spawn a new future to regularly check if we need to acquire more peers
// and if so, gets them from db
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(10))
.for_each(move |_| {
if p2p_server.peer_count() < PEER_PREFERRED_COUNT {
let mut peers = peer_store.find_peers(p2p::State::Healthy,
p2p::UNKNOWN,
(2 * PEER_MAX_COUNT) as usize);
debug!("Got {} more peers from db, trying to connect.", peers.len());
thread_rng().shuffle(&mut peers[..]);
let sz = min(PEER_PREFERRED_COUNT as usize, peers.len());
for p in &peers[0..sz] {
tx.send(p.addr).unwrap();
}
}
Ok(())
// TODO clean disconnected server peers
})
.map_err(|e| e.to_string());
Box::new(mon_loop)
}
// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
fn connect_to_seeds(&self,
tx: mpsc::UnboundedSender<SocketAddr>,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>)
-> Box<Future<Item = (), Error = String>> {
let peer_store = self.peer_store.clone();
// a thread pool is required so we don't block the event loop with a
// db query
let thread_pool = cpupool::CpuPool::new(1);
let seeder = thread_pool.spawn_fn(move || {
// check if we have some peers in db
Ok(peer_store.find_peers(p2p::State::Healthy,
p2p::FULL_HIST,
(2 * PEER_MAX_COUNT) as usize))
})
.and_then(|mut peers| {
// if so, get their addresses, otherwise use our seeds
if peers.len() > 0 {
thread_rng().shuffle(&mut peers[..]);
Box::new(future::ok(peers.iter().map(|p| p.addr).collect::<Vec<_>>()))
} else {
seed_list
}
})
.and_then(move |peer_addrs| {
// connect to this first set of addresses
let sz = min(PEER_PREFERRED_COUNT as usize, peer_addrs.len());
for addr in &peer_addrs[0..sz] {
debug!("Connecting to seed: {}.", addr);
tx.send(*addr).unwrap();
}
Ok(())
});
Box::new(seeder)
}
/// Builds a future to continuously listen on a channel receiver for new
/// addresses to and initiate a connection if the max peer count isn't
/// exceeded. A request for more peers is also automatically sent after
/// connection.
fn listen_for_addrs(&self,
h: reactor::Handle,
rx: mpsc::UnboundedReceiver<SocketAddr>)
-> Box<Future<Item = (), Error = ()>> {
let capab = self.capabilities;
let p2p_server = self.p2p.clone();
let listener = rx.for_each(move |peer_addr| {
debug!("New peer address to connect to: {}.", peer_addr);
let inner_h = h.clone();
if p2p_server.peer_count() < PEER_MAX_COUNT {
connect_and_req(capab, p2p_server.clone(), inner_h, peer_addr)
} else {
Box::new(future::ok(()))
}
});
Box::new(listener)
}
}
/// Extract the list of seeds from a pre-defined gist. Easy method until we
/// have a set of DNS names we can rely on.
pub fn gist_seeds(h: reactor::Handle) -> Box<Future<Item = Vec<SocketAddr>, Error = String>> {
let url = hyper::Url::parse(&GIST_SEEDS_URL).unwrap();
let seeds = future::ok(()).and_then(move |_| {
let client = hyper::Client::new(&h);
// http get, filtering out non 200 results
client.get(url)
.map_err(|e| e.to_string())
.and_then(|res| {
if *res.status() != hyper::Ok {
return Err(format!("Gist request failed: {}", res.status()));
}
Ok(res)
})
.and_then(|res| {
// collect all chunks and split around whitespace to get a list of SocketAddr
res.body().collect().map_err(|e| e.to_string()).and_then(|chunks| {
let res = chunks.iter().fold("".to_string(), |acc, ref chunk| {
acc + str::from_utf8(&chunk[..]).unwrap()
});
let addrs =
res.split_whitespace().map(|s| s.parse().unwrap()).collect::<Vec<_>>();
Ok(addrs)
})
})
});
Box::new(seeds)
}
/// Convenience function when the seed list is immediately known. Mostly used
/// for tests.
pub fn predefined_seeds(addrs_str: Vec<String>)
-> Box<Future<Item = Vec<SocketAddr>, Error = String>> {
let seeds = future::ok(())
.and_then(move |_| Ok(addrs_str.iter().map(|s| s.parse().unwrap()).collect::<Vec<_>>()));
Box::new(seeds)
}
fn connect_and_req(capab: p2p::Capabilities,
p2p: Arc<p2p::Server>,
h: reactor::Handle,
addr: SocketAddr)
-> Box<Future<Item = (), Error = ()>> {
let fut = p2p.connect_peer(addr, h)
.and_then(move |p| {
if let Some(p) = p {
p.send_peer_request(capab);
}
Ok(())
})
.map_err(|e| {
error!("Peer request error {:?}", e);
()
});
Box::new(fut)
}

View file

@ -20,7 +20,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use futures::Future;
use futures::{future, Future};
use tokio_core::reactor;
use adapters::{NetToChainAdapter, ChainToNetAdapter};
@ -29,6 +29,7 @@ use chain::ChainStore;
use core;
use miner;
use p2p;
use seed;
use store;
use sync;
@ -44,14 +45,39 @@ pub enum Error {
StoreErr(store::Error),
}
impl From<store::Error> for Error {
fn from(e: store::Error) -> Error {
Error::StoreErr(e)
}
}
/// Type of seeding the server will use to find other peers on the network.
#[derive(Debug, Clone)]
pub enum Seeding {
/// No seeding, mostly for tests that programmatically connect
None,
/// A list of seed addresses provided to the server
List(Vec<String>),
/// Automatically download a gist with a list of server addresses
Gist,
}
/// Full server configuration, aggregating configurations required for the
/// different components.
#[derive(Debug, Clone)]
pub struct ServerConfig {
/// Directory under which the rocksdb stores will be created
pub db_root: String,
/// Allows overriding the default cuckoo cycle size
pub cuckoo_size: u8,
/// Capabilities expose by this node, also conditions which other peers this
/// node will have an affinity toward when connection.
pub capabilities: p2p::Capabilities,
pub seeding_type: Seeding,
/// Configuration for the peer-to-peer server
pub p2p_config: p2p::P2PConfig,
}
@ -61,6 +87,8 @@ impl Default for ServerConfig {
ServerConfig {
db_root: ".grin".to_string(),
cuckoo_size: 0,
capabilities: p2p::FULL_NODE,
seeding_type: Seeding::None,
p2p_config: p2p::P2PConfig::default(),
}
}
@ -68,7 +96,7 @@ impl Default for ServerConfig {
/// Grin server holding internal structures.
pub struct Server {
config: ServerConfig,
pub config: ServerConfig,
evt_handle: reactor::Handle,
/// handle to our network server
p2p: Arc<p2p::Server>,
@ -84,32 +112,10 @@ pub struct Server {
impl Server {
/// Instantiates and starts a new server.
pub fn start(config: ServerConfig) -> Result<Server, Error> {
let (chain_store, head) = try!(store_head(&config));
let shared_head = Arc::new(Mutex::new(head));
let chain_adapter = Arc::new(ChainToNetAdapter::new());
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone()));
chain_adapter.init(server.clone());
let sync = sync::Syncer::new(chain_store.clone(), server.clone());
net_adapter.start_sync(sync);
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
evtlp.run(server.start(handle.clone())).unwrap();
warn!("Grin server started.");
Ok(Server {
config: config,
evt_handle: handle.clone(),
p2p: server,
chain_head: shared_head,
chain_store: chain_store,
chain_adapter: chain_adapter,
})
let serv = Server::future(config, &evtlp.handle());
evtlp.run(future::ok::<(), ()>(())).unwrap();
serv
}
/// Instantiates a new server associated with the provided future reactor.
@ -117,13 +123,28 @@ impl Server {
let (chain_store, head) = try!(store_head(&config));
let shared_head = Arc::new(Mutex::new(head));
let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?);
let chain_adapter = Arc::new(ChainToNetAdapter::new());
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone()));
chain_adapter.clone(),
peer_store.clone()));
let server =
Arc::new(p2p::Server::new(config.capabilities, config.p2p_config, net_adapter.clone()));
chain_adapter.init(server.clone());
let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), server.clone());
match config.seeding_type.clone() {
Seeding::None => {}
Seeding::List(seeds) => {
seed.connect_and_monitor(evt_handle.clone(), seed::predefined_seeds(seeds));
}
Seeding::Gist => {
seed.connect_and_monitor(evt_handle.clone(), seed::gist_seeds(evt_handle.clone()));
}
}
let sync = sync::Syncer::new(chain_store.clone(), server.clone());
net_adapter.start_sync(sync);
@ -143,10 +164,14 @@ impl Server {
/// Asks the server to connect to a peer at the provided network address.
pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> {
let handle = self.evt_handle.clone();
handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ()));
handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map(|_| ()).map_err(|_| ()));
Ok(())
}
pub fn peer_count(&self) -> u32 {
self.p2p.peer_count()
}
/// Start mining for blocks on a separate thread. Relies on a toy miner,
/// mostly for testing.
pub fn start_miner(&self) {

View file

@ -20,15 +20,20 @@ extern crate grin_chain as chain;
extern crate env_logger;
extern crate futures;
extern crate tokio_core;
extern crate tokio_timer;
use std::io;
use std::thread;
use std::time;
use std::default::Default;
use futures::{Future, Poll, Async};
use futures::task::park;
use tokio_core::reactor;
use tokio_timer::Timer;
/// Create a network of 5 servers and mine a block, verifying that the block
/// gets propagated to all.
#[test]
fn simulate_block_propagation() {
env_logger::init();
@ -43,7 +48,8 @@ fn simulate_block_propagation() {
grin::ServerConfig{
db_root: format!("target/grin-prop-{}", n),
cuckoo_size: 12,
p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()}
p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()},
..Default::default()
}, &handle).unwrap();
servers.push(s);
}
@ -69,6 +75,8 @@ fn simulate_block_propagation() {
}));
}
/// Creates 2 different disconnected servers, mine a few blocks on one, connect
/// them and check that the 2nd gets all the blocks
#[test]
fn simulate_full_sync() {
env_logger::init();
@ -83,7 +91,8 @@ fn simulate_full_sync() {
grin::ServerConfig{
db_root: format!("target/grin-sync-{}", n),
cuckoo_size: 12,
p2p_config: p2p::P2PConfig{port: 11000+n, ..p2p::P2PConfig::default()}
p2p_config: p2p::P2PConfig{port: 11000+n, ..p2p::P2PConfig::default()},
..Default::default()
}, &handle).unwrap();
servers.push(s);
}
@ -100,6 +109,39 @@ fn simulate_full_sync() {
evtlp.run(change(&servers[1]));
}
/// Creates 5 servers, one being a seed and check that through peer address
/// messages they all end up connected.
#[test]
fn simulate_seeding() {
env_logger::init();
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
// instantiates 5 servers on different ports, with 0 as a seed
let mut servers = vec![];
for n in 0..5 {
let s = grin::Server::future(
grin::ServerConfig{
db_root: format!("target/grin-seed-{}", n),
cuckoo_size: 12,
p2p_config: p2p::P2PConfig{port: 12000+n, ..p2p::P2PConfig::default()},
seeding_type: grin::Seeding::List(vec!["127.0.0.1:10000".to_string()]),
..Default::default()
}, &handle).unwrap();
servers.push(s);
}
// wait a bit and check all servers are now connected
evtlp.run(Timer::default().sleep(time::Duration::from_secs(30)).and_then(|_| {
for s in servers {
// occasionally 2 peers will connect to each other at the same time
assert!(s.peer_count() >= 4);
}
Ok(())
}));
}
// Builds the change future, monitoring for a change of head on the provided server
fn change<'a>(s: &'a grin::Server) -> HeadChange<'a> {
let start_head = s.head();

View file

@ -223,7 +223,7 @@ impl Connection {
pub struct TimeoutConnection {
underlying: Connection,
expected_responses: Arc<Mutex<Vec<(Type, Hash, Option<bool>, Instant)>>>,
expected_responses: Arc<Mutex<Vec<(Type, Option<Hash>, Instant)>>>,
}
impl TimeoutConnection {
@ -244,15 +244,13 @@ impl TimeoutConnection {
let recv_h = try!(handler.handle(sender, header, data));
let mut expects = exp.lock().unwrap();
println!("EXP1 {}", expects.len());
let filtered = expects.iter()
.filter(|&&(typ, h, _, _)| {
msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h
.filter(|&&(typ, h, _): &&(Type, Option<Hash>, Instant)| {
msg_type != typ || h.is_some() && recv_h != h
})
.map(|&x| x)
.collect::<Vec<_>>();
*expects = filtered;
println!("EXP2 {}", expects.len());
Ok(recv_h)
});
@ -263,7 +261,7 @@ impl TimeoutConnection {
.interval(Duration::new(2, 0))
.fold((), move |_, _| {
let exp = exp.lock().unwrap();
for &(_, _, _, t) in exp.deref() {
for &(_, _, t) in exp.deref() {
if Instant::now() - t > Duration::new(2, 0) {
return Err(TimerError::TooLong);
}
@ -283,17 +281,14 @@ impl TimeoutConnection {
/// optionally the hash of the sent data.
pub fn send_request(&self,
t: Type,
rt: Type,
body: &ser::Writeable,
expect_h: Option<(Type, Hash)>)
expect_h: Option<(Hash)>)
-> Result<(), ser::Error> {
let sent = try!(self.underlying.send_msg(t, body));
let mut expects = self.expected_responses.lock().unwrap();
if let Some((rt, h)) = expect_h {
expects.push((rt, h, None, Instant::now()));
} else {
expects.push((t, ZERO_HASH, None, Instant::now()));
}
expects.push((rt, expect_h, Instant::now()));
Ok(())
}

View file

@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use futures::Future;
@ -47,17 +48,19 @@ impl Handshake {
/// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect(&self,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: SocketAddr,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
// prepare the first part of the hanshake
let nonce = self.next_nonce();
let hand = Hand {
version: PROTOCOL_VERSION,
capabilities: FULL_HIST,
capabilities: capab,
nonce: nonce,
total_difficulty: total_difficulty,
sender_addr: SockAddr(conn.local_addr().unwrap()),
sender_addr: SockAddr(self_addr),
receiver_addr: SockAddr(conn.peer_addr().unwrap()),
user_agent: USER_AGENT.to_string(),
};
@ -90,6 +93,7 @@ impl Handshake {
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake(&self,
capab: Capabilities,
total_difficulty: Difficulty,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
@ -116,20 +120,21 @@ impl Handshake {
let peer_info = PeerInfo {
capabilities: hand.capabilities,
user_agent: hand.user_agent,
addr: conn.peer_addr().unwrap(),
addr: hand.sender_addr.0,
version: hand.version,
total_difficulty: hand.total_difficulty,
};
// send our reply with our info
let shake = Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_HIST,
capabilities: capab,
total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(),
};
Ok((conn, shake, peer_info))
})
.and_then(|(conn, shake, peer_info)| {
debug!("Success handshake with {}.", peer_info.addr);
write_msg(conn, shake, Type::Shake)
// when more than one protocol version is supported, choosing should go here
.map(|conn| (conn, ProtocolV1::new(), peer_info))

View file

@ -44,9 +44,11 @@ mod msg;
mod peer;
mod protocol;
mod server;
pub mod store;
mod store;
mod types;
pub use server::{Server, DummyAdapter};
pub use peer::Peer;
pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS};
pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS,
Capabilities, UNKNOWN, FULL_NODE, FULL_HIST, PeerInfo};
pub use store::{PeerStore, PeerData, State};

View file

@ -311,10 +311,16 @@ impl Writeable for PeerAddrs {
impl Readable<PeerAddrs> for PeerAddrs {
fn read(reader: &mut Reader) -> Result<PeerAddrs, ser::Error> {
let peer_count = try!(reader.read_u32());
if peer_count > 1000 {
if peer_count > MAX_PEER_ADDRS {
return Err(ser::Error::TooLargeReadErr);
} else if peer_count == 0 {
return Ok(PeerAddrs { peers: vec![] });
}
// let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader));
let mut peers = Vec::with_capacity(peer_count as usize);
for _ in 0..peer_count {
peers.push(SockAddr::read(reader)?);
}
let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader));
Ok(PeerAddrs { peers: peers })
}
}

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::sync::Arc;
use futures::Future;
@ -35,31 +36,36 @@ unsafe impl Send for Peer {}
impl Peer {
/// Initiates the handshake with another peer.
pub fn connect(conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: SocketAddr,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(total_difficulty, conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: Box::new(proto),
}))
});
let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn)
.and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: Box::new(proto),
}))
});
Box::new(connect_peer)
}
/// Accept a handshake initiated by another peer.
pub fn accept(conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let hs_peer = hs.handshake(total_difficulty, conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: Box::new(proto),
}))
});
let hs_peer = hs.handshake(capab, total_difficulty, conn)
.and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
proto: Box::new(proto),
}))
});
Box::new(hs_peer)
}
@ -102,6 +108,11 @@ impl Peer {
self.proto.send_block_request(h)
}
pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
debug!("Asking {} for more peers.", self.info.addr);
self.proto.send_peer_request(capab)
}
pub fn stop(&self) {
self.proto.close();
}

View file

@ -68,7 +68,7 @@ impl Protocol for ProtocolV1 {
/// Sends a ping message to the remote peer. Will panic if handle has never
/// been called on this protocol.
fn send_ping(&self) -> Result<(), ser::Error> {
self.send_request(Type::Ping, &Empty {}, None)
self.send_request(Type::Ping, Type::Pong, &Empty {}, None)
}
/// Serializes and sends a block to our remote peer
@ -82,11 +82,21 @@ impl Protocol for ProtocolV1 {
}
fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), ser::Error> {
self.send_request(Type::GetHeaders, &Locator { hashes: locator }, None)
self.send_request(Type::GetHeaders,
Type::Headers,
&Locator { hashes: locator },
None)
}
fn send_block_request(&self, h: Hash) -> Result<(), ser::Error> {
self.send_request(Type::GetBlock, &h, Some((Type::Block, h)))
self.send_request(Type::GetBlock, Type::Block, &h, Some(h))
}
fn send_peer_request(&self, capab: Capabilities) -> Result<(), ser::Error> {
self.send_request(Type::GetPeerAddrs,
Type::PeerAddrs,
&GetPeerAddrs { capabilities: capab },
None)
}
/// Close the connection to the remote peer
@ -102,10 +112,11 @@ impl ProtocolV1 {
fn send_request(&self,
t: Type,
rt: Type,
body: &ser::Writeable,
expect_resp: Option<(Type, Hash)>)
expect_resp: Option<Hash>)
-> Result<(), ser::Error> {
self.conn.borrow().send_request(t, body, expect_resp)
self.conn.borrow().send_request(t, rt, body, expect_resp)
}
}
@ -168,6 +179,29 @@ fn handle_payload(adapter: &NetAdapter,
adapter.headers_received(headers.headers);
Ok(None)
}
Type::GetPeerAddrs => {
let get_peers = ser::deserialize::<GetPeerAddrs>(&mut &buf[..])?;
let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities);
// serialize and send all the headers over
let mut body_data = vec![];
try!(ser::serialize(&mut body_data,
&PeerAddrs {
peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(),
}));
let mut data = vec![];
try!(ser::serialize(&mut data,
&MsgHeader::new(Type::PeerAddrs, body_data.len() as u64)));
data.append(&mut body_data);
sender.send(data);
Ok(None)
}
Type::PeerAddrs => {
let peer_addrs = ser::deserialize::<PeerAddrs>(&mut &buf[..])?;
adapter.peer_addrs_received(peer_addrs.peers.iter().map(|pa| pa.0).collect());
Ok(None)
}
_ => {
debug!("unknown message type {:?}", header.msg_type);
Ok(None)

View file

@ -23,7 +23,7 @@ use std::time::Duration;
use futures;
use futures::{Future, Stream};
use futures::future::IntoFuture;
use futures::future::{self, IntoFuture};
use rand::{self, Rng};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
@ -51,12 +51,18 @@ impl NetAdapter for DummyAdapter {
fn get_block(&self, h: Hash) -> Option<core::Block> {
None
}
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr> {
vec![]
}
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {}
fn peer_connected(&self, pi: &PeerInfo) {}
}
/// P2P server implementation, handling bootstrapping to find and connect to
/// peers, receiving connections from other peers and keep track of all of them.
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
peers: Arc<RwLock<Vec<Arc<Peer>>>>,
adapter: Arc<NetAdapter>,
stop: RefCell<Option<futures::sync::oneshot::Sender<()>>>,
@ -68,9 +74,10 @@ unsafe impl Send for Server {}
// TODO TLS
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(config: P2PConfig, adapter: Arc<NetAdapter>) -> Server {
pub fn new(capab: Capabilities, config: P2PConfig, adapter: Arc<NetAdapter>) -> Server {
Server {
config: config,
capabilities: capab,
peers: Arc::new(RwLock::new(Vec::new())),
adapter: adapter,
stop: RefCell::new(None),
@ -87,6 +94,7 @@ impl Server {
let hs = Arc::new(Handshake::new());
let peers = self.peers.clone();
let adapter = self.adapter.clone();
let capab = self.capabilities.clone();
// main peer acceptance future handling handshake
let hp = h.clone();
@ -96,7 +104,9 @@ impl Server {
let peers = peers.clone();
// accept the peer and add it to the server map
let peer_accept = add_to_peers(peers, Peer::accept(conn, total_diff, &hs.clone()));
let peer_accept = add_to_peers(peers,
adapter.clone(),
Peer::accept(conn, capab, total_diff, &hs.clone()));
// wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(peer_accept), &hp);
@ -136,23 +146,49 @@ impl Server {
pub fn connect_peer(&self,
addr: SocketAddr,
h: reactor::Handle)
-> Box<Future<Item = (), Error = Error>> {
-> Box<Future<Item = Option<Arc<Peer>>, Error = Error>> {
for p in self.peers.read().unwrap().deref() {
// if we're already connected to the addr, just return the peer
if p.info.addr == addr {
return Box::new(future::ok(Some((*p).clone())));
}
}
// asked to connect to ourselves
if addr.ip() == self.config.host && addr.port() == self.config.port {
return Box::new(future::ok(None));
}
let peers = self.peers.clone();
let adapter1 = self.adapter.clone();
let adapter2 = self.adapter.clone();
let capab = self.capabilities.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port);
debug!("{} connecting to {}", self_addr, addr);
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e));
let h2 = h.clone();
let request = socket.and_then(move |socket| {
let peers = peers.clone();
let total_diff = adapter1.total_difficulty();
let total_diff = adapter1.clone().total_difficulty();
// connect to the peer and add it to the server map, wiring it a timeout for
// the handhake
let peer_connect =
add_to_peers(peers, Peer::connect(socket, total_diff, &Handshake::new()));
let peer_connect = add_to_peers(peers,
adapter1,
Peer::connect(socket,
capab,
total_diff,
self_addr,
&Handshake::new()));
with_timeout(Box::new(peer_connect), &h)
})
.and_then(move |(socket, peer)| peer.run(socket, adapter2));
.and_then(move |(socket, peer)| {
h2.spawn(peer.run(socket, adapter2).map_err(|e| {
error!("Peer error: {:?}", e);
()
}));
Ok(Some(peer))
});
Box::new(request)
}
@ -212,11 +248,13 @@ impl Server {
// Adds the peer built by the provided future in the peers map
fn add_to_peers<A>(peers: Arc<RwLock<Vec<Arc<Peer>>>>,
adapter: Arc<NetAdapter>,
peer_fut: A)
-> Box<Future<Item = Result<(TcpStream, Arc<Peer>), ()>, Error = Error>>
where A: IntoFuture<Item = (TcpStream, Peer), Error = Error> + 'static
{
let peer_add = peer_fut.into_future().map(move |(conn, peer)| {
adapter.peer_connected(&peer.info);
let apeer = Arc::new(peer);
let mut peers = peers.write().unwrap();
peers.push(apeer.clone());

View file

@ -36,14 +36,14 @@ enum_from_primitive! {
}
}
pub struct Peer {
pub struct PeerData {
pub addr: SocketAddr,
pub capabilities: Capabilities,
pub user_agent: String,
pub flags: State,
}
impl Writeable for Peer {
impl Writeable for PeerData {
fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> {
SockAddr(self.addr).write(writer)?;
ser_multiwrite!(writer,
@ -54,15 +54,15 @@ impl Writeable for Peer {
}
}
impl Readable<Peer> for Peer {
fn read(reader: &mut Reader) -> Result<Peer, ser::Error> {
impl Readable<PeerData> for PeerData {
fn read(reader: &mut Reader) -> Result<PeerData, ser::Error> {
let addr = SockAddr::read(reader)?;
let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8);
let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?;
let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?;
match State::from_u8(fl) {
Some(flags) => {
Ok(Peer {
Ok(PeerData {
addr: addr.0,
capabilities: capabilities,
user_agent: user_agent,
@ -84,18 +84,22 @@ impl PeerStore {
Ok(PeerStore { db: db })
}
pub fn save_peer(&self, p: &Peer) -> Result<(), Error> {
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..],
p)
}
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
self.db.exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..])
}
pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> {
self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..])
}
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<Peer> {
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
let peers_iter = self.db
.iter::<Peer>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()));
.iter::<PeerData>(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes()));
let mut peers = Vec::with_capacity(count);
for p in peers_iter {
if p.flags == state && p.capabilities.contains(cap) {

View file

@ -32,6 +32,9 @@ pub const MAX_BLOCK_HEADERS: u32 = 512;
/// Maximum number of block bodies a peer should ever ask for and send
pub const MAX_BLOCK_BODIES: u32 = 16;
/// Maximum number of peer addresses a peer should ever send
pub const MAX_PEER_ADDRS: u32 = 256;
/// Configuration for the peer-to-peer server.
#[derive(Debug, Clone, Copy)]
pub struct P2PConfig {
@ -60,6 +63,10 @@ bitflags! {
/// Can provide block headers and the UTXO set for some recent-enough
/// height.
const UTXO_HIST = 0b00000010,
/// Can provide a list of healthy peers
const PEER_LIST = 0b00000100,
const FULL_NODE = FULL_HIST.bits | UTXO_HIST.bits | PEER_LIST.bits,
}
}
@ -101,6 +108,9 @@ pub trait Protocol {
/// Sends a request for a block from its hash.
fn send_block_request(&self, h: Hash) -> Result<(), Error>;
/// Sends a request for some peer addresses.
fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error>;
/// How many bytes have been sent/received to/from the remote peer.
fn transmitted_bytes(&self) -> (u64, u64);
@ -133,4 +143,14 @@ pub trait NetAdapter: Sync + Send {
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block>;
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<SocketAddr>;
/// A list of peers has been received from one of our peers.
fn peer_addrs_received(&self, Vec<SocketAddr>);
/// Network successfully connected to a peer.
fn peer_connected(&self, &PeerInfo);
}

View file

@ -130,6 +130,12 @@ impl Store {
}
}
/// Whether the provided key exists
pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
let db = self.rdb.read().unwrap();
db.get(key).map(|r| r.is_some()).map_err(From::from)
}
/// Deletes a key/value pair from the db
pub fn delete(&self, key: &[u8]) -> Result<(), Error> {
let db = self.rdb.write().unwrap();