diff --git a/grin/Cargo.toml b/grin/Cargo.toml index 8eb6f3bcb..299cd267d 100644 --- a/grin/Cargo.toml +++ b/grin/Cargo.toml @@ -13,7 +13,10 @@ secp256k1zkp = { path = "../secp256k1zkp" } env_logger="^0.3.5" futures = "^0.1.9" +futures-cpupool = "^0.1.3" +hyper = { git = "https://github.com/hyperium/hyper" } log = "^0.3" time = "^0.1" tokio-core="^0.1.1" +tokio-timer="^0.1.0" rand = "^0.3" diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index a30aaeb72..4c0f0d323 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::net::SocketAddr; use std::ops::Deref; use std::sync::{Arc, Mutex}; use std::thread; @@ -20,7 +21,7 @@ use chain::{self, ChainAdapter}; use core::core; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; -use p2p::{self, NetAdapter, Server}; +use p2p::{self, NetAdapter, Server, PeerStore, PeerData, Capabilities, State}; use util::OneTime; use store; use sync; @@ -33,6 +34,7 @@ pub struct NetToChainAdapter { chain_head: Arc>, chain_store: Arc, chain_adapter: Arc, + peer_store: Arc, syncer: OneTime>, } @@ -149,6 +151,7 @@ impl NetAdapter for NetToChainAdapter { headers } + /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option { let store = self.chain_store.clone(); let b = store.get_block(&h); @@ -157,17 +160,62 @@ impl NetAdapter for NetToChainAdapter { _ => None, } } + + /// Find good peers we know with the provided capability and return their + /// addresses. + fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec { + let peers = self.peer_store.find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize); + debug!("Got {} peer addrs to send.", peers.len()); + map_vec!(peers, |p| p.addr) + } + + /// A list of peers has been received from one of our peers. + fn peer_addrs_received(&self, peer_addrs: Vec) { + debug!("Received {} peer addrs, saving.", peer_addrs.len()); + for pa in peer_addrs { + if let Ok(e) = self.peer_store.exists_peer(pa) { + if e { + continue; + } + } + let peer = PeerData { + addr: pa, + capabilities: p2p::UNKNOWN, + user_agent: "".to_string(), + flags: State::Healthy, + }; + if let Err(e) = self.peer_store.save_peer(&peer) { + error!("Could not save received peer address: {:?}", e); + } + } + } + + /// Network successfully connected to a peer. + fn peer_connected(&self, pi: &p2p::PeerInfo) { + debug!("Saving newly connected peer {}.", pi.addr); + let peer = PeerData { + addr: pi.addr, + capabilities: pi.capabilities, + user_agent: pi.user_agent.clone(), + flags: State::Healthy, + }; + if let Err(e) = self.peer_store.save_peer(&peer) { + error!("Could not save connected peer: {:?}", e); + } + } } impl NetToChainAdapter { pub fn new(chain_head: Arc>, chain_store: Arc, - chain_adapter: Arc) + chain_adapter: Arc, + peer_store: Arc) -> NetToChainAdapter { NetToChainAdapter { chain_head: chain_head, chain_store: chain_store, chain_adapter: chain_adapter, + peer_store: peer_store, syncer: OneTime::new(), } } diff --git a/grin/src/lib.rs b/grin/src/lib.rs index a3b459dc2..7b197001d 100644 --- a/grin/src/lib.rs +++ b/grin/src/lib.rs @@ -25,11 +25,15 @@ extern crate log; extern crate env_logger; extern crate futures; +extern crate futures_cpupool as cpupool; +extern crate hyper; extern crate rand; extern crate time; extern crate tokio_core; +extern crate tokio_timer; extern crate grin_chain as chain; +#[macro_use] extern crate grin_core as core; extern crate grin_p2p as p2p; extern crate grin_store as store; @@ -39,6 +43,7 @@ extern crate secp256k1zkp as secp; mod adapters; mod miner; mod server; +mod seed; mod sync; -pub use server::{Server, ServerConfig}; +pub use server::{Server, ServerConfig, Seeding}; diff --git a/grin/src/seed.rs b/grin/src/seed.rs new file mode 100644 index 000000000..77c5a56d0 --- /dev/null +++ b/grin/src/seed.rs @@ -0,0 +1,220 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use rand::{thread_rng, Rng}; +use std::cmp::min; +use std::net::SocketAddr; +use std::ops::Deref; +use std::str; +use std::sync::Arc; +use std::thread; +use std::time; + +use cpupool; +use futures::{self, future, Future, Stream}; +use futures::sync::mpsc; +use hyper; +use tokio_core::reactor; +use tokio_timer::Timer; + +use p2p; + +const PEER_MAX_COUNT: u32 = 25; +const PEER_PREFERRED_COUNT: u32 = 8; +const GIST_SEEDS_URL: &'static str = ""; + +pub struct Seeder { + peer_store: Arc, + p2p: Arc, + + capabilities: p2p::Capabilities, +} + +impl Seeder { + pub fn new(capabilities: p2p::Capabilities, + peer_store: Arc, + p2p: Arc) + -> Seeder { + Seeder { + peer_store: peer_store, + p2p: p2p, + capabilities: capabilities, + } + } + + pub fn connect_and_monitor(&self, + h: reactor::Handle, + seed_list: Box, Error = String>>) { + // open a channel with a listener that connects every peer address sent below + // max peer count + let (tx, rx) = futures::sync::mpsc::unbounded(); + h.spawn(self.listen_for_addrs(h.clone(), rx)); + + // check seeds and start monitoring connections + let seeder = self.connect_to_seeds(tx.clone(), seed_list) + .join(self.monitor_peers(tx.clone())); + + h.spawn(seeder.map(|_| ()).map_err(|_| ())); + } + + fn monitor_peers(&self, + tx: mpsc::UnboundedSender) + -> Box> { + let peer_store = self.peer_store.clone(); + let p2p_server = self.p2p.clone(); + + // now spawn a new future to regularly check if we need to acquire more peers + // and if so, gets them from db + let mon_loop = Timer::default() + .interval(time::Duration::from_secs(10)) + .for_each(move |_| { + if p2p_server.peer_count() < PEER_PREFERRED_COUNT { + let mut peers = peer_store.find_peers(p2p::State::Healthy, + p2p::UNKNOWN, + (2 * PEER_MAX_COUNT) as usize); + debug!("Got {} more peers from db, trying to connect.", peers.len()); + thread_rng().shuffle(&mut peers[..]); + let sz = min(PEER_PREFERRED_COUNT as usize, peers.len()); + for p in &peers[0..sz] { + tx.send(p.addr).unwrap(); + } + } + Ok(()) + // TODO clean disconnected server peers + }) + .map_err(|e| e.to_string()); + Box::new(mon_loop) + } + + // Check if we have any pre-existing peer in db. If so, start with those, + // otherwise use the seeds provided. + fn connect_to_seeds(&self, + tx: mpsc::UnboundedSender, + seed_list: Box, Error = String>>) + -> Box> { + let peer_store = self.peer_store.clone(); + + // a thread pool is required so we don't block the event loop with a + // db query + let thread_pool = cpupool::CpuPool::new(1); + let seeder = thread_pool.spawn_fn(move || { + // check if we have some peers in db + Ok(peer_store.find_peers(p2p::State::Healthy, + p2p::FULL_HIST, + (2 * PEER_MAX_COUNT) as usize)) + }) + .and_then(|mut peers| { + // if so, get their addresses, otherwise use our seeds + if peers.len() > 0 { + thread_rng().shuffle(&mut peers[..]); + Box::new(future::ok(peers.iter().map(|p| p.addr).collect::>())) + } else { + seed_list + } + }) + .and_then(move |peer_addrs| { + // connect to this first set of addresses + let sz = min(PEER_PREFERRED_COUNT as usize, peer_addrs.len()); + for addr in &peer_addrs[0..sz] { + debug!("Connecting to seed: {}.", addr); + tx.send(*addr).unwrap(); + } + Ok(()) + }); + Box::new(seeder) + } + + /// Builds a future to continuously listen on a channel receiver for new + /// addresses to and initiate a connection if the max peer count isn't + /// exceeded. A request for more peers is also automatically sent after + /// connection. + fn listen_for_addrs(&self, + h: reactor::Handle, + rx: mpsc::UnboundedReceiver) + -> Box> { + let capab = self.capabilities; + let p2p_server = self.p2p.clone(); + + let listener = rx.for_each(move |peer_addr| { + debug!("New peer address to connect to: {}.", peer_addr); + let inner_h = h.clone(); + if p2p_server.peer_count() < PEER_MAX_COUNT { + connect_and_req(capab, p2p_server.clone(), inner_h, peer_addr) + } else { + Box::new(future::ok(())) + } + }); + Box::new(listener) + } +} + +/// Extract the list of seeds from a pre-defined gist. Easy method until we +/// have a set of DNS names we can rely on. +pub fn gist_seeds(h: reactor::Handle) -> Box, Error = String>> { + let url = hyper::Url::parse(&GIST_SEEDS_URL).unwrap(); + let seeds = future::ok(()).and_then(move |_| { + let client = hyper::Client::new(&h); + + // http get, filtering out non 200 results + client.get(url) + .map_err(|e| e.to_string()) + .and_then(|res| { + if *res.status() != hyper::Ok { + return Err(format!("Gist request failed: {}", res.status())); + } + Ok(res) + }) + .and_then(|res| { + + // collect all chunks and split around whitespace to get a list of SocketAddr + res.body().collect().map_err(|e| e.to_string()).and_then(|chunks| { + let res = chunks.iter().fold("".to_string(), |acc, ref chunk| { + acc + str::from_utf8(&chunk[..]).unwrap() + }); + let addrs = + res.split_whitespace().map(|s| s.parse().unwrap()).collect::>(); + Ok(addrs) + }) + }) + }); + Box::new(seeds) +} + +/// Convenience function when the seed list is immediately known. Mostly used +/// for tests. +pub fn predefined_seeds(addrs_str: Vec) + -> Box, Error = String>> { + let seeds = future::ok(()) + .and_then(move |_| Ok(addrs_str.iter().map(|s| s.parse().unwrap()).collect::>())); + Box::new(seeds) +} + +fn connect_and_req(capab: p2p::Capabilities, + p2p: Arc, + h: reactor::Handle, + addr: SocketAddr) + -> Box> { + let fut = p2p.connect_peer(addr, h) + .and_then(move |p| { + if let Some(p) = p { + p.send_peer_request(capab); + } + Ok(()) + }) + .map_err(|e| { + error!("Peer request error {:?}", e); + () + }); + Box::new(fut) +} diff --git a/grin/src/server.rs b/grin/src/server.rs index 7d07caf6f..7e816e566 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -20,7 +20,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::thread; -use futures::Future; +use futures::{future, Future}; use tokio_core::reactor; use adapters::{NetToChainAdapter, ChainToNetAdapter}; @@ -29,6 +29,7 @@ use chain::ChainStore; use core; use miner; use p2p; +use seed; use store; use sync; @@ -44,14 +45,39 @@ pub enum Error { StoreErr(store::Error), } +impl From for Error { + fn from(e: store::Error) -> Error { + Error::StoreErr(e) + } +} + +/// Type of seeding the server will use to find other peers on the network. +#[derive(Debug, Clone)] +pub enum Seeding { + /// No seeding, mostly for tests that programmatically connect + None, + /// A list of seed addresses provided to the server + List(Vec), + /// Automatically download a gist with a list of server addresses + Gist, +} + /// Full server configuration, aggregating configurations required for the /// different components. #[derive(Debug, Clone)] pub struct ServerConfig { /// Directory under which the rocksdb stores will be created pub db_root: String, + /// Allows overriding the default cuckoo cycle size pub cuckoo_size: u8, + + /// Capabilities expose by this node, also conditions which other peers this + /// node will have an affinity toward when connection. + pub capabilities: p2p::Capabilities, + + pub seeding_type: Seeding, + /// Configuration for the peer-to-peer server pub p2p_config: p2p::P2PConfig, } @@ -61,6 +87,8 @@ impl Default for ServerConfig { ServerConfig { db_root: ".grin".to_string(), cuckoo_size: 0, + capabilities: p2p::FULL_NODE, + seeding_type: Seeding::None, p2p_config: p2p::P2PConfig::default(), } } @@ -68,7 +96,7 @@ impl Default for ServerConfig { /// Grin server holding internal structures. pub struct Server { - config: ServerConfig, + pub config: ServerConfig, evt_handle: reactor::Handle, /// handle to our network server p2p: Arc, @@ -84,32 +112,10 @@ pub struct Server { impl Server { /// Instantiates and starts a new server. pub fn start(config: ServerConfig) -> Result { - let (chain_store, head) = try!(store_head(&config)); - let shared_head = Arc::new(Mutex::new(head)); - - let chain_adapter = Arc::new(ChainToNetAdapter::new()); - let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), - chain_store.clone(), - chain_adapter.clone())); - let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone())); - chain_adapter.init(server.clone()); - - let sync = sync::Syncer::new(chain_store.clone(), server.clone()); - net_adapter.start_sync(sync); - let mut evtlp = reactor::Core::new().unwrap(); - let handle = evtlp.handle(); - evtlp.run(server.start(handle.clone())).unwrap(); - - warn!("Grin server started."); - Ok(Server { - config: config, - evt_handle: handle.clone(), - p2p: server, - chain_head: shared_head, - chain_store: chain_store, - chain_adapter: chain_adapter, - }) + let serv = Server::future(config, &evtlp.handle()); + evtlp.run(future::ok::<(), ()>(())).unwrap(); + serv } /// Instantiates a new server associated with the provided future reactor. @@ -117,13 +123,28 @@ impl Server { let (chain_store, head) = try!(store_head(&config)); let shared_head = Arc::new(Mutex::new(head)); + let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); + let chain_adapter = Arc::new(ChainToNetAdapter::new()); let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(), chain_store.clone(), - chain_adapter.clone())); - let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone())); + chain_adapter.clone(), + peer_store.clone())); + let server = + Arc::new(p2p::Server::new(config.capabilities, config.p2p_config, net_adapter.clone())); chain_adapter.init(server.clone()); + let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), server.clone()); + match config.seeding_type.clone() { + Seeding::None => {} + Seeding::List(seeds) => { + seed.connect_and_monitor(evt_handle.clone(), seed::predefined_seeds(seeds)); + } + Seeding::Gist => { + seed.connect_and_monitor(evt_handle.clone(), seed::gist_seeds(evt_handle.clone())); + } + } + let sync = sync::Syncer::new(chain_store.clone(), server.clone()); net_adapter.start_sync(sync); @@ -143,10 +164,14 @@ impl Server { /// Asks the server to connect to a peer at the provided network address. pub fn connect_peer(&self, addr: SocketAddr) -> Result<(), Error> { let handle = self.evt_handle.clone(); - handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map_err(|_| ())); + handle.spawn(self.p2p.connect_peer(addr, handle.clone()).map(|_| ()).map_err(|_| ())); Ok(()) } + pub fn peer_count(&self) -> u32 { + self.p2p.peer_count() + } + /// Start mining for blocks on a separate thread. Relies on a toy miner, /// mostly for testing. pub fn start_miner(&self) { diff --git a/grin/tests/simulnet.rs b/grin/tests/simulnet.rs index a5b39f406..1d8b582c1 100644 --- a/grin/tests/simulnet.rs +++ b/grin/tests/simulnet.rs @@ -20,15 +20,20 @@ extern crate grin_chain as chain; extern crate env_logger; extern crate futures; extern crate tokio_core; +extern crate tokio_timer; use std::io; use std::thread; use std::time; +use std::default::Default; use futures::{Future, Poll, Async}; use futures::task::park; use tokio_core::reactor; +use tokio_timer::Timer; +/// Create a network of 5 servers and mine a block, verifying that the block +/// gets propagated to all. #[test] fn simulate_block_propagation() { env_logger::init(); @@ -43,7 +48,8 @@ fn simulate_block_propagation() { grin::ServerConfig{ db_root: format!("target/grin-prop-{}", n), cuckoo_size: 12, - p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()} + p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()}, + ..Default::default() }, &handle).unwrap(); servers.push(s); } @@ -69,6 +75,8 @@ fn simulate_block_propagation() { })); } +/// Creates 2 different disconnected servers, mine a few blocks on one, connect +/// them and check that the 2nd gets all the blocks #[test] fn simulate_full_sync() { env_logger::init(); @@ -83,7 +91,8 @@ fn simulate_full_sync() { grin::ServerConfig{ db_root: format!("target/grin-sync-{}", n), cuckoo_size: 12, - p2p_config: p2p::P2PConfig{port: 11000+n, ..p2p::P2PConfig::default()} + p2p_config: p2p::P2PConfig{port: 11000+n, ..p2p::P2PConfig::default()}, + ..Default::default() }, &handle).unwrap(); servers.push(s); } @@ -100,6 +109,39 @@ fn simulate_full_sync() { evtlp.run(change(&servers[1])); } +/// Creates 5 servers, one being a seed and check that through peer address +/// messages they all end up connected. +#[test] +fn simulate_seeding() { + env_logger::init(); + + let mut evtlp = reactor::Core::new().unwrap(); + let handle = evtlp.handle(); + + // instantiates 5 servers on different ports, with 0 as a seed + let mut servers = vec![]; + for n in 0..5 { + let s = grin::Server::future( + grin::ServerConfig{ + db_root: format!("target/grin-seed-{}", n), + cuckoo_size: 12, + p2p_config: p2p::P2PConfig{port: 12000+n, ..p2p::P2PConfig::default()}, + seeding_type: grin::Seeding::List(vec!["127.0.0.1:10000".to_string()]), + ..Default::default() + }, &handle).unwrap(); + servers.push(s); + } + + // wait a bit and check all servers are now connected + evtlp.run(Timer::default().sleep(time::Duration::from_secs(30)).and_then(|_| { + for s in servers { + // occasionally 2 peers will connect to each other at the same time + assert!(s.peer_count() >= 4); + } + Ok(()) + })); +} + // Builds the change future, monitoring for a change of head on the provided server fn change<'a>(s: &'a grin::Server) -> HeadChange<'a> { let start_head = s.head(); diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index ddf17a444..a7ff9f3d5 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -223,7 +223,7 @@ impl Connection { pub struct TimeoutConnection { underlying: Connection, - expected_responses: Arc, Instant)>>>, + expected_responses: Arc, Instant)>>>, } impl TimeoutConnection { @@ -244,15 +244,13 @@ impl TimeoutConnection { let recv_h = try!(handler.handle(sender, header, data)); let mut expects = exp.lock().unwrap(); - println!("EXP1 {}", expects.len()); let filtered = expects.iter() - .filter(|&&(typ, h, _, _)| { - msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h + .filter(|&&(typ, h, _): &&(Type, Option, Instant)| { + msg_type != typ || h.is_some() && recv_h != h }) .map(|&x| x) .collect::>(); *expects = filtered; - println!("EXP2 {}", expects.len()); Ok(recv_h) }); @@ -263,7 +261,7 @@ impl TimeoutConnection { .interval(Duration::new(2, 0)) .fold((), move |_, _| { let exp = exp.lock().unwrap(); - for &(_, _, _, t) in exp.deref() { + for &(_, _, t) in exp.deref() { if Instant::now() - t > Duration::new(2, 0) { return Err(TimerError::TooLong); } @@ -283,17 +281,14 @@ impl TimeoutConnection { /// optionally the hash of the sent data. pub fn send_request(&self, t: Type, + rt: Type, body: &ser::Writeable, - expect_h: Option<(Type, Hash)>) + expect_h: Option<(Hash)>) -> Result<(), ser::Error> { let sent = try!(self.underlying.send_msg(t, body)); let mut expects = self.expected_responses.lock().unwrap(); - if let Some((rt, h)) = expect_h { - expects.push((rt, h, None, Instant::now())); - } else { - expects.push((t, ZERO_HASH, None, Instant::now())); - } + expects.push((rt, expect_h, Instant::now())); Ok(()) } diff --git a/p2p/src/handshake.rs b/p2p/src/handshake.rs index 630ce3859..03a1c7f97 100644 --- a/p2p/src/handshake.rs +++ b/p2p/src/handshake.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::VecDeque; +use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use futures::Future; @@ -47,17 +48,19 @@ impl Handshake { /// Handles connecting to a new remote peer, starting the version handshake. pub fn connect(&self, + capab: Capabilities, total_difficulty: Difficulty, + self_addr: SocketAddr, conn: TcpStream) -> Box> { // prepare the first part of the hanshake let nonce = self.next_nonce(); let hand = Hand { version: PROTOCOL_VERSION, - capabilities: FULL_HIST, + capabilities: capab, nonce: nonce, total_difficulty: total_difficulty, - sender_addr: SockAddr(conn.local_addr().unwrap()), + sender_addr: SockAddr(self_addr), receiver_addr: SockAddr(conn.peer_addr().unwrap()), user_agent: USER_AGENT.to_string(), }; @@ -90,6 +93,7 @@ impl Handshake { /// Handles receiving a connection from a new remote peer that started the /// version handshake. pub fn handshake(&self, + capab: Capabilities, total_difficulty: Difficulty, conn: TcpStream) -> Box> { @@ -116,20 +120,21 @@ impl Handshake { let peer_info = PeerInfo { capabilities: hand.capabilities, user_agent: hand.user_agent, - addr: conn.peer_addr().unwrap(), + addr: hand.sender_addr.0, version: hand.version, total_difficulty: hand.total_difficulty, }; // send our reply with our info let shake = Shake { version: PROTOCOL_VERSION, - capabilities: FULL_HIST, + capabilities: capab, total_difficulty: total_difficulty, user_agent: USER_AGENT.to_string(), }; Ok((conn, shake, peer_info)) }) .and_then(|(conn, shake, peer_info)| { + debug!("Success handshake with {}.", peer_info.addr); write_msg(conn, shake, Type::Shake) // when more than one protocol version is supported, choosing should go here .map(|conn| (conn, ProtocolV1::new(), peer_info)) diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 80d7b9332..b2ead4cf4 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -44,9 +44,11 @@ mod msg; mod peer; mod protocol; mod server; -pub mod store; +mod store; mod types; pub use server::{Server, DummyAdapter}; pub use peer::Peer; -pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS}; +pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, + Capabilities, UNKNOWN, FULL_NODE, FULL_HIST, PeerInfo}; +pub use store::{PeerStore, PeerData, State}; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index e5398ac83..da54a8bcf 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -311,10 +311,16 @@ impl Writeable for PeerAddrs { impl Readable for PeerAddrs { fn read(reader: &mut Reader) -> Result { let peer_count = try!(reader.read_u32()); - if peer_count > 1000 { + if peer_count > MAX_PEER_ADDRS { return Err(ser::Error::TooLargeReadErr); + } else if peer_count == 0 { + return Ok(PeerAddrs { peers: vec![] }); + } + // let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader)); + let mut peers = Vec::with_capacity(peer_count as usize); + for _ in 0..peer_count { + peers.push(SockAddr::read(reader)?); } - let peers = try_map_vec!([0..peer_count], |_| SockAddr::read(reader)); Ok(PeerAddrs { peers: peers }) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 741e420bb..e3a59de62 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::net::SocketAddr; use std::sync::Arc; use futures::Future; @@ -35,31 +36,36 @@ unsafe impl Send for Peer {} impl Peer { /// Initiates the handshake with another peer. pub fn connect(conn: TcpStream, + capab: Capabilities, total_difficulty: Difficulty, + self_addr: SocketAddr, hs: &Handshake) -> Box> { - let connect_peer = hs.connect(total_difficulty, conn).and_then(|(conn, proto, info)| { - Ok((conn, - Peer { - info: info, - proto: Box::new(proto), - })) - }); + let connect_peer = hs.connect(capab, total_difficulty, self_addr, conn) + .and_then(|(conn, proto, info)| { + Ok((conn, + Peer { + info: info, + proto: Box::new(proto), + })) + }); Box::new(connect_peer) } /// Accept a handshake initiated by another peer. pub fn accept(conn: TcpStream, + capab: Capabilities, total_difficulty: Difficulty, hs: &Handshake) -> Box> { - let hs_peer = hs.handshake(total_difficulty, conn).and_then(|(conn, proto, info)| { - Ok((conn, - Peer { - info: info, - proto: Box::new(proto), - })) - }); + let hs_peer = hs.handshake(capab, total_difficulty, conn) + .and_then(|(conn, proto, info)| { + Ok((conn, + Peer { + info: info, + proto: Box::new(proto), + })) + }); Box::new(hs_peer) } @@ -102,6 +108,11 @@ impl Peer { self.proto.send_block_request(h) } + pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> { + debug!("Asking {} for more peers.", self.info.addr); + self.proto.send_peer_request(capab) + } + pub fn stop(&self) { self.proto.close(); } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 5a722af58..28ff3d49f 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -68,7 +68,7 @@ impl Protocol for ProtocolV1 { /// Sends a ping message to the remote peer. Will panic if handle has never /// been called on this protocol. fn send_ping(&self) -> Result<(), ser::Error> { - self.send_request(Type::Ping, &Empty {}, None) + self.send_request(Type::Ping, Type::Pong, &Empty {}, None) } /// Serializes and sends a block to our remote peer @@ -82,11 +82,21 @@ impl Protocol for ProtocolV1 { } fn send_header_request(&self, locator: Vec) -> Result<(), ser::Error> { - self.send_request(Type::GetHeaders, &Locator { hashes: locator }, None) + self.send_request(Type::GetHeaders, + Type::Headers, + &Locator { hashes: locator }, + None) } fn send_block_request(&self, h: Hash) -> Result<(), ser::Error> { - self.send_request(Type::GetBlock, &h, Some((Type::Block, h))) + self.send_request(Type::GetBlock, Type::Block, &h, Some(h)) + } + + fn send_peer_request(&self, capab: Capabilities) -> Result<(), ser::Error> { + self.send_request(Type::GetPeerAddrs, + Type::PeerAddrs, + &GetPeerAddrs { capabilities: capab }, + None) } /// Close the connection to the remote peer @@ -102,10 +112,11 @@ impl ProtocolV1 { fn send_request(&self, t: Type, + rt: Type, body: &ser::Writeable, - expect_resp: Option<(Type, Hash)>) + expect_resp: Option) -> Result<(), ser::Error> { - self.conn.borrow().send_request(t, body, expect_resp) + self.conn.borrow().send_request(t, rt, body, expect_resp) } } @@ -168,6 +179,29 @@ fn handle_payload(adapter: &NetAdapter, adapter.headers_received(headers.headers); Ok(None) } + Type::GetPeerAddrs => { + let get_peers = ser::deserialize::(&mut &buf[..])?; + let peer_addrs = adapter.find_peer_addrs(get_peers.capabilities); + + // serialize and send all the headers over + let mut body_data = vec![]; + try!(ser::serialize(&mut body_data, + &PeerAddrs { + peers: peer_addrs.iter().map(|sa| SockAddr(*sa)).collect(), + })); + let mut data = vec![]; + try!(ser::serialize(&mut data, + &MsgHeader::new(Type::PeerAddrs, body_data.len() as u64))); + data.append(&mut body_data); + sender.send(data); + + Ok(None) + } + Type::PeerAddrs => { + let peer_addrs = ser::deserialize::(&mut &buf[..])?; + adapter.peer_addrs_received(peer_addrs.peers.iter().map(|pa| pa.0).collect()); + Ok(None) + } _ => { debug!("unknown message type {:?}", header.msg_type); Ok(None) diff --git a/p2p/src/server.rs b/p2p/src/server.rs index ecf31d393..b8f43e421 100644 --- a/p2p/src/server.rs +++ b/p2p/src/server.rs @@ -23,7 +23,7 @@ use std::time::Duration; use futures; use futures::{Future, Stream}; -use futures::future::IntoFuture; +use futures::future::{self, IntoFuture}; use rand::{self, Rng}; use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor; @@ -51,12 +51,18 @@ impl NetAdapter for DummyAdapter { fn get_block(&self, h: Hash) -> Option { None } + fn find_peer_addrs(&self, capab: Capabilities) -> Vec { + vec![] + } + fn peer_addrs_received(&self, peer_addrs: Vec) {} + fn peer_connected(&self, pi: &PeerInfo) {} } /// P2P server implementation, handling bootstrapping to find and connect to /// peers, receiving connections from other peers and keep track of all of them. pub struct Server { config: P2PConfig, + capabilities: Capabilities, peers: Arc>>>, adapter: Arc, stop: RefCell>>, @@ -68,9 +74,10 @@ unsafe impl Send for Server {} // TODO TLS impl Server { /// Creates a new idle p2p server with no peers - pub fn new(config: P2PConfig, adapter: Arc) -> Server { + pub fn new(capab: Capabilities, config: P2PConfig, adapter: Arc) -> Server { Server { config: config, + capabilities: capab, peers: Arc::new(RwLock::new(Vec::new())), adapter: adapter, stop: RefCell::new(None), @@ -87,6 +94,7 @@ impl Server { let hs = Arc::new(Handshake::new()); let peers = self.peers.clone(); let adapter = self.adapter.clone(); + let capab = self.capabilities.clone(); // main peer acceptance future handling handshake let hp = h.clone(); @@ -96,7 +104,9 @@ impl Server { let peers = peers.clone(); // accept the peer and add it to the server map - let peer_accept = add_to_peers(peers, Peer::accept(conn, total_diff, &hs.clone())); + let peer_accept = add_to_peers(peers, + adapter.clone(), + Peer::accept(conn, capab, total_diff, &hs.clone())); // wire in a future to timeout the accept after 5 secs let timed_peer = with_timeout(Box::new(peer_accept), &hp); @@ -136,23 +146,49 @@ impl Server { pub fn connect_peer(&self, addr: SocketAddr, h: reactor::Handle) - -> Box> { + -> Box>, Error = Error>> { + for p in self.peers.read().unwrap().deref() { + // if we're already connected to the addr, just return the peer + if p.info.addr == addr { + return Box::new(future::ok(Some((*p).clone()))); + } + } + // asked to connect to ourselves + if addr.ip() == self.config.host && addr.port() == self.config.port { + return Box::new(future::ok(None)); + } let peers = self.peers.clone(); let adapter1 = self.adapter.clone(); let adapter2 = self.adapter.clone(); + let capab = self.capabilities.clone(); + let self_addr = SocketAddr::new(self.config.host, self.config.port); + + debug!("{} connecting to {}", self_addr, addr); let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e)); + let h2 = h.clone(); let request = socket.and_then(move |socket| { let peers = peers.clone(); - let total_diff = adapter1.total_difficulty(); + let total_diff = adapter1.clone().total_difficulty(); // connect to the peer and add it to the server map, wiring it a timeout for // the handhake - let peer_connect = - add_to_peers(peers, Peer::connect(socket, total_diff, &Handshake::new())); + let peer_connect = add_to_peers(peers, + adapter1, + Peer::connect(socket, + capab, + total_diff, + self_addr, + &Handshake::new())); with_timeout(Box::new(peer_connect), &h) }) - .and_then(move |(socket, peer)| peer.run(socket, adapter2)); + .and_then(move |(socket, peer)| { + h2.spawn(peer.run(socket, adapter2).map_err(|e| { + error!("Peer error: {:?}", e); + () + })); + Ok(Some(peer)) + }); Box::new(request) } @@ -212,11 +248,13 @@ impl Server { // Adds the peer built by the provided future in the peers map fn add_to_peers(peers: Arc>>>, + adapter: Arc, peer_fut: A) -> Box), ()>, Error = Error>> where A: IntoFuture + 'static { let peer_add = peer_fut.into_future().map(move |(conn, peer)| { + adapter.peer_connected(&peer.info); let apeer = Arc::new(peer); let mut peers = peers.write().unwrap(); peers.push(apeer.clone()); diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 2abdcdee0..c628fd202 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -36,14 +36,14 @@ enum_from_primitive! { } } -pub struct Peer { +pub struct PeerData { pub addr: SocketAddr, pub capabilities: Capabilities, pub user_agent: String, pub flags: State, } -impl Writeable for Peer { +impl Writeable for PeerData { fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> { SockAddr(self.addr).write(writer)?; ser_multiwrite!(writer, @@ -54,15 +54,15 @@ impl Writeable for Peer { } } -impl Readable for Peer { - fn read(reader: &mut Reader) -> Result { +impl Readable for PeerData { + fn read(reader: &mut Reader) -> Result { let addr = SockAddr::read(reader)?; let (capab, ua, fl) = ser_multiread!(reader, read_u32, read_vec, read_u8); let user_agent = String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)?; let capabilities = Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData)?; match State::from_u8(fl) { Some(flags) => { - Ok(Peer { + Ok(PeerData { addr: addr.0, capabilities: capabilities, user_agent: user_agent, @@ -84,18 +84,22 @@ impl PeerStore { Ok(PeerStore { db: db }) } - pub fn save_peer(&self, p: &Peer) -> Result<(), Error> { + pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { self.db.put_ser(&to_key(PEER_PREFIX, &mut format!("{}", p.addr).into_bytes())[..], p) } + pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result { + self.db.exists(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) + } + pub fn delete_peer(&self, peer_addr: SocketAddr) -> Result<(), Error> { self.db.delete(&to_key(PEER_PREFIX, &mut format!("{}", peer_addr).into_bytes())[..]) } - pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { + pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { let peers_iter = self.db - .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); + .iter::(&to_key(PEER_PREFIX, &mut "".to_string().into_bytes())); let mut peers = Vec::with_capacity(count); for p in peers_iter { if p.flags == state && p.capabilities.contains(cap) { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 0eba0ee5f..7e420eed1 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -32,6 +32,9 @@ pub const MAX_BLOCK_HEADERS: u32 = 512; /// Maximum number of block bodies a peer should ever ask for and send pub const MAX_BLOCK_BODIES: u32 = 16; +/// Maximum number of peer addresses a peer should ever send +pub const MAX_PEER_ADDRS: u32 = 256; + /// Configuration for the peer-to-peer server. #[derive(Debug, Clone, Copy)] pub struct P2PConfig { @@ -60,6 +63,10 @@ bitflags! { /// Can provide block headers and the UTXO set for some recent-enough /// height. const UTXO_HIST = 0b00000010, + /// Can provide a list of healthy peers + const PEER_LIST = 0b00000100, + + const FULL_NODE = FULL_HIST.bits | UTXO_HIST.bits | PEER_LIST.bits, } } @@ -101,6 +108,9 @@ pub trait Protocol { /// Sends a request for a block from its hash. fn send_block_request(&self, h: Hash) -> Result<(), Error>; + /// Sends a request for some peer addresses. + fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error>; + /// How many bytes have been sent/received to/from the remote peer. fn transmitted_bytes(&self) -> (u64, u64); @@ -133,4 +143,14 @@ pub trait NetAdapter: Sync + Send { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option; + + /// Find good peers we know with the provided capability and return their + /// addresses. + fn find_peer_addrs(&self, capab: Capabilities) -> Vec; + + /// A list of peers has been received from one of our peers. + fn peer_addrs_received(&self, Vec); + + /// Network successfully connected to a peer. + fn peer_connected(&self, &PeerInfo); } diff --git a/store/src/lib.rs b/store/src/lib.rs index 2dfdcdf92..791d10d83 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -130,6 +130,12 @@ impl Store { } } + /// Whether the provided key exists + pub fn exists(&self, key: &[u8]) -> Result { + let db = self.rdb.read().unwrap(); + db.get(key).map(|r| r.is_some()).map_err(From::from) + } + /// Deletes a key/value pair from the db pub fn delete(&self, key: &[u8]) -> Result<(), Error> { let db = self.rdb.write().unwrap();