Added full node sync mode. Follows closely the bitcoin header-first sync. Related p2p messages and protocol.

This commit is contained in:
Ignotus Peverell 2017-02-07 13:52:17 -08:00
parent f6114231ae
commit 4af049a887
No known key found for this signature in database
GPG key ID: 99CD25F39F8F8211
15 changed files with 643 additions and 70 deletions

View file

@ -12,12 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::thread;
use chain::{self, ChainAdapter};
use core::core;
use p2p::{NetAdapter, Server};
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use p2p::{self, NetAdapter, Server};
use util::OneTime;
use sync;
/// Implementation of the NetAdapter for the blockchain. Gets notified when new
/// blocks and transactions are received and forwards to the chain and pool
@ -27,23 +32,32 @@ pub struct NetToChainAdapter {
chain_head: Arc<Mutex<chain::Tip>>,
chain_store: Arc<chain::ChainStore>,
chain_adapter: Arc<ChainToNetAdapter>,
syncer: OneTime<Arc<sync::Syncer>>,
}
impl NetAdapter for NetToChainAdapter {
fn height(&self) -> u64 {
self.chain_head.lock().unwrap().height
fn total_difficulty(&self) -> Difficulty {
self.chain_head.lock().unwrap().clone().total_difficulty
}
fn transaction_received(&self, tx: core::Transaction) {
unimplemented!();
}
fn block_received(&self, b: core::Block) {
// TODO delegate to a separate thread to avoid holding up the caller
debug!("Received block {} from network, going to process.",
b.hash());
// pushing the new block through the chain pipeline
let store = self.chain_store.clone();
let chain_adapter = self.chain_adapter.clone();
let res = chain::process_block(&b, store, chain_adapter, chain::NONE);
let opts = if self.syncer.borrow().syncing() {
chain::SYNC
} else {
chain::NONE
};
let res = chain::process_block(&b, store, chain_adapter, opts);
// log errors and update the shared head reference on success
if let Err(e) = res {
@ -53,6 +67,94 @@ impl NetAdapter for NetToChainAdapter {
let mut head = chain_head.lock().unwrap();
*head = tip;
}
if self.syncer.borrow().syncing() {
self.syncer.borrow().block_received(b.hash());
}
}
fn headers_received(&self, bhs: Vec<core::BlockHeader>) {
let opts = if self.syncer.borrow().syncing() {
chain::SYNC
} else {
chain::NONE
};
// try to add each header to our header chain
let mut added_hs = vec![];
for bh in bhs {
let store = self.chain_store.clone();
let chain_adapter = self.chain_adapter.clone();
let res = chain::process_block_header(&bh, store, chain_adapter, opts);
match res {
Ok(_) => {
added_hs.push(bh.hash());
}
Err(chain::Error::Unfit(_)) => {
info!("Received unfit block header {} at {}.",
bh.hash(),
bh.height);
}
Err(chain::Error::StoreErr(e)) => {
error!("Store error processing block header {}: {:?}", bh.hash(), e);
return;
}
Err(e) => {
info!("Invalid block header {}: {:?}.", bh.hash(), e);
// TODO penalize peer somehow
}
}
}
info!("Added {} headers to the header chain.", added_hs.len());
if self.syncer.borrow().syncing() {
self.syncer.borrow().headers_received(added_hs);
}
}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {
if locator.len() == 0 {
return vec![];
}
// go through the locator vector and check if we know any of these headers
let known = self.chain_store.get_block_header(&locator[0]);
let header = match known {
Ok(header) => header,
Err(chain::types::Error::NotFoundErr) => {
return self.locate_headers(locator[1..].to_vec());
}
Err(e) => {
error!("Could not build header locator: {:?}", e);
return vec![];
}
};
// looks like we know one, getting as many following headers as allowed
let hh = header.height;
let mut headers = vec![];
for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) {
let header = self.chain_store.get_header_by_height(h);
match header {
Ok(head) => headers.push(head),
Err(chain::types::Error::NotFoundErr) => break,
Err(e) => {
error!("Could not build header locator: {:?}", e);
return vec![];
}
}
}
headers
}
fn get_block(&self, h: Hash) -> Option<core::Block> {
let store = self.chain_store.clone();
let b = store.get_block(&h);
match b {
Ok(b) => Some(b),
_ => None,
}
}
}
@ -65,8 +167,17 @@ impl NetToChainAdapter {
chain_head: chain_head,
chain_store: chain_store,
chain_adapter: chain_adapter,
syncer: OneTime::new(),
}
}
pub fn start_sync(&self, sync: sync::Syncer) {
let arc_sync = Arc::new(sync);
self.syncer.init(arc_sync.clone());
thread::Builder::new().name("syncer".to_string()).spawn(move || {
arc_sync.run();
});
}
}
/// Implementation of the ChainAdapter for the network. Gets notified when the

View file

@ -39,5 +39,6 @@ extern crate secp256k1zkp as secp;
mod adapters;
mod miner;
mod server;
mod sync;
pub use server::{Server, ServerConfig};

View file

@ -23,8 +23,6 @@ use adapters::ChainToNetAdapter;
use core::consensus;
use core::core;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use core::pow;
use core::pow::cuckoo;
use chain;
use secp;

View file

@ -29,6 +29,7 @@ use chain::ChainStore;
use core;
use miner;
use p2p;
use sync;
/// Errors than can be reported by a server implementation, mostly wraps
/// underlying components errors.
@ -89,9 +90,12 @@ impl Server {
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone()));
chain_adapter.init(server.clone());
let sync = sync::Syncer::new(chain_store.clone(), server.clone());
net_adapter.start_sync(sync);
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
evtlp.run(server.start(handle.clone())).unwrap();
@ -116,9 +120,12 @@ impl Server {
let net_adapter = Arc::new(NetToChainAdapter::new(shared_head.clone(),
chain_store.clone(),
chain_adapter.clone()));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter));
let server = Arc::new(p2p::Server::new(config.p2p_config, net_adapter.clone()));
chain_adapter.init(server.clone());
let sync = sync::Syncer::new(chain_store.clone(), server.clone());
net_adapter.start_sync(sync);
evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ()));
warn!("Grin server started.");
@ -173,7 +180,7 @@ fn store_head(config: &ServerConfig)
if config.cuckoo_size > 0 {
gen.header.cuckoo_len = config.cuckoo_size;
let diff = gen.header.difficulty.clone();
core::pow::pow(&mut gen, diff).unwrap();
core::pow::pow(&mut gen.header, diff).unwrap();
}
try!(chain_store.save_block(&gen).map_err(&Error::StoreErr));
let tip = chain::types::Tip::new(gen.hash());

233
grin/src/sync.rs Normal file
View file

@ -0,0 +1,233 @@
// Copyright 2016 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Synchronization of the local blockchain with the rest of the network. Used
//! either on a brand new node or when a node is late based on others' heads.
//! Always starts by downloading the header chain before asking either for full
//! blocks or a full UTXO set with related information.
/// How many block bodies to download in parallel
const MAX_BODY_DOWNLOADS: usize = 8;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Instant, Duration};
use core::core::hash::{Hash, Hashed};
use chain;
use p2p;
pub struct Syncer {
chain_store: Arc<chain::ChainStore>,
p2p: Arc<p2p::Server>,
sync: Mutex<bool>,
last_header_req: Mutex<Instant>,
blocks_to_download: Mutex<Vec<Hash>>,
blocks_downloading: Mutex<Vec<(Hash, Instant)>>,
}
impl Syncer {
pub fn new(chain_store: Arc<chain::ChainStore>, p2p: Arc<p2p::Server>) -> Syncer {
Syncer {
chain_store: chain_store,
p2p: p2p,
sync: Mutex::new(true),
last_header_req: Mutex::new(Instant::now() - Duration::from_secs(2)),
blocks_to_download: Mutex::new(vec![]),
blocks_downloading: Mutex::new(vec![]),
}
}
pub fn syncing(&self) -> bool {
*self.sync.lock().unwrap()
}
/// Checks the local chain state, comparing it with our peers and triggers
/// syncing if required.
pub fn run(&self) -> Result<(), chain::Error> {
debug!("Starting syncer.");
let start = Instant::now();
loop {
let pc = self.p2p.peer_count();
if pc > 3 {
break;
}
if pc > 0 && (Instant::now() - start > Duration::from_secs(15)) {
break;
}
thread::sleep(Duration::from_millis(200));
}
// check if we have missing full blocks for which we already have a header
self.init_download()?;
// main syncing loop, requests more headers and bodies periodically as long
// as a peer with higher difficulty exists and we're not fully caught up
info!("Starting sync loop.");
loop {
let tip = self.chain_store.get_header_head()?;
// TODO do something better (like trying to get more) if we lose peers
let peer = self.p2p.most_work_peer().unwrap();
let more_headers = peer.info.total_difficulty > tip.total_difficulty;
let more_bodies = {
let blocks_to_download = self.blocks_to_download.lock().unwrap();
let blocks_downloading = self.blocks_downloading.lock().unwrap();
blocks_to_download.len() > 0 || blocks_downloading.len() > 0
};
{
let last_header_req = self.last_header_req.lock().unwrap().clone();
if more_headers && (Instant::now() - Duration::from_secs(2) > last_header_req) {
self.request_headers()?;
}
}
if more_bodies {
self.request_bodies();
}
if !more_headers && !more_bodies {
// TODO check we haven't been lied to on the total work
let mut sync = self.sync.lock().unwrap();
*sync = false;
break;
}
thread::sleep(Duration::from_secs(2));
}
info!("Sync done.");
Ok(())
}
/// Checks the gap between the header chain and the full block chain and
/// initializes the blocks_to_download structure with the missing full
/// blocks
fn init_download(&self) -> Result<(), chain::Error> {
// compare the header's head to the full one to see what we're missing
let header_head = self.chain_store.get_header_head()?;
let full_head = self.chain_store.head()?;
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
// go back the chain and insert for download all blocks we only have the
// head for
let mut prev_h = header_head.last_block_h;
while prev_h != full_head.last_block_h {
let header = self.chain_store.get_block_header(&prev_h)?;
blocks_to_download.push(header.hash());
prev_h = header.previous;
}
debug!("Added {} full block hashes to download.",
blocks_to_download.len());
Ok(())
}
/// Asks for the blocks we haven't downloaded yet and place them in the
/// downloading structure.
fn request_bodies(&self) {
let mut blocks_downloading = self.blocks_downloading.lock().unwrap();
if blocks_downloading.len() > MAX_BODY_DOWNLOADS {
// clean up potentially dead downloads
let twenty_sec_ago = Instant::now() - Duration::from_secs(20);
blocks_downloading.iter()
.position(|&h| h.1 < twenty_sec_ago)
.map(|n| blocks_downloading.remove(n));
} else {
// consume hashes from blocks to download, place them in downloading and
// request them from the network
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
while blocks_to_download.len() > 0 && blocks_downloading.len() < MAX_BODY_DOWNLOADS {
let h = blocks_to_download.pop().unwrap();
let peer = self.p2p.random_peer().unwrap();
peer.send_block_request(h);
blocks_downloading.push((h, Instant::now()));
}
debug!("Requesting more full block hashes to download, total: {}.",
blocks_to_download.len());
}
}
/// We added a block, clean up the downloading structure
pub fn block_received(&self, bh: Hash) {
// just clean up the downloading list
let mut bds = self.blocks_downloading.lock().unwrap();
bds.iter().position(|&h| h.0 == bh).map(|n| bds.remove(n));
}
/// Request some block headers from a peer to advance us
fn request_headers(&self) -> Result<(), chain::Error> {
{
let mut last_header_req = self.last_header_req.lock().unwrap();
*last_header_req = Instant::now();
}
let tip = self.chain_store.get_header_head()?;
let peer = self.p2p.most_work_peer();
let locator = self.get_locator(&tip)?;
if let Some(p) = peer {
debug!("Asking peer {} for more block headers.", p.info.addr);
p.send_header_request(locator)?;
} else {
warn!("Could not get most worked peer to request headers.");
}
Ok(())
}
/// We added a header, add it to the full block download list
pub fn headers_received(&self, bhs: Vec<Hash>) {
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
let hs_len = bhs.len();
for h in bhs {
// enlist for full block download
blocks_to_download.insert(0, h);
}
// ask for more headers if we got as many as required
if hs_len == (p2p::MAX_BLOCK_HEADERS as usize) {
self.request_headers();
}
}
/// Builds a vector of block hashes that should help the remote peer sending
/// us the right block headers.
fn get_locator(&self, tip: &chain::Tip) -> Result<Vec<Hash>, chain::Error> {
// Prepare the heights we want as the latests height minus increasing powers
// of 2 up to max.
let mut heights = vec![tip.height];
let mut tail = (1..p2p::MAX_LOCATORS)
.map(|n| 2u64.pow(n))
.filter_map(|n| if n > tip.height {
None
} else {
Some(tip.height - n)
})
.collect::<Vec<_>>();
heights.append(&mut tail);
// Iteratively travel the header chain back from our head and retain the
// headers at the wanted heights.
let mut header = self.chain_store.get_block_header(&tip.last_block_h)?;
let mut locator = vec![];
while heights.len() > 0 {
if header.height == heights[0] {
heights = heights[1..].to_vec();
locator.push(header.hash());
}
if header.height > 0 {
header = self.chain_store.get_block_header(&header.previous)?;
}
}
Ok(locator)
}
}

View file

@ -30,8 +30,8 @@ use futures::task::park;
use tokio_core::reactor;
#[test]
fn simulate_servers() {
env_logger::init().unwrap();
fn simulate_block_propagation() {
env_logger::init();
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
@ -41,7 +41,7 @@ fn simulate_servers() {
for n in 0..5 {
let s = grin::Server::future(
grin::ServerConfig{
db_root: format!("target/grin-{}", n),
db_root: format!("target/grin-prop-{}", n),
cuckoo_size: 12,
p2p_config: p2p::P2PConfig{port: 10000+n, ..p2p::P2PConfig::default()}
}, &handle).unwrap();
@ -69,6 +69,37 @@ fn simulate_servers() {
}));
}
#[test]
fn simulate_full_sync() {
env_logger::init();
let mut evtlp = reactor::Core::new().unwrap();
let handle = evtlp.handle();
// instantiates 2 servers on different ports
let mut servers = vec![];
for n in 0..2 {
let s = grin::Server::future(
grin::ServerConfig{
db_root: format!("target/grin-sync-{}", n),
cuckoo_size: 12,
p2p_config: p2p::P2PConfig{port: 11000+n, ..p2p::P2PConfig::default()}
}, &handle).unwrap();
servers.push(s);
}
// mine a few blocks on server 1
servers[0].start_miner();
thread::sleep(time::Duration::from_secs(15));
// connect 1 and 2
let addr = format!("{}:{}", "127.0.0.1", 11001);
servers[0].connect_peer(addr.parse().unwrap()).unwrap();
// 2 should get blocks
evtlp.run(change(&servers[1]));
}
// Builds the change future, monitoring for a change of head on the provided server
fn change<'a>(s: &'a grin::Server) -> HeadChange<'a> {
let start_head = s.head();

View file

@ -223,7 +223,7 @@ impl Connection {
pub struct TimeoutConnection {
underlying: Connection,
expected_responses: Arc<Mutex<Vec<(Type, Hash, Instant)>>>,
expected_responses: Arc<Mutex<Vec<(Type, Hash, Option<bool>, Instant)>>>,
}
impl TimeoutConnection {
@ -244,11 +244,15 @@ impl TimeoutConnection {
let recv_h = try!(handler.handle(sender, header, data));
let mut expects = exp.lock().unwrap();
println!("EXP1 {}", expects.len());
let filtered = expects.iter()
.filter(|&&(typ, h, _)| msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h)
.filter(|&&(typ, h, _, _)| {
msg_type != typ || recv_h.is_some() && recv_h.unwrap() != h
})
.map(|&x| x)
.collect::<Vec<_>>();
*expects = filtered;
println!("EXP2 {}", expects.len());
Ok(recv_h)
});
@ -259,7 +263,7 @@ impl TimeoutConnection {
.interval(Duration::new(2, 0))
.fold((), move |_, _| {
let exp = exp.lock().unwrap();
for &(_, _, t) in exp.deref() {
for &(_, _, _, t) in exp.deref() {
if Instant::now() - t > Duration::new(2, 0) {
return Err(TimerError::TooLong);
}
@ -280,15 +284,15 @@ impl TimeoutConnection {
pub fn send_request(&self,
t: Type,
body: &ser::Writeable,
expect_h: Option<Hash>)
expect_h: Option<(Type, Hash)>)
-> Result<(), ser::Error> {
let sent = try!(self.underlying.send_msg(t, body));
let mut expects = self.expected_responses.lock().unwrap();
if let Some(h) = expect_h {
expects.push((t, h, Instant::now()));
if let Some((rt, h)) = expect_h {
expects.push((rt, h, None, Instant::now()));
} else {
expects.push((t, ZERO_HASH, Instant::now()));
expects.push((t, ZERO_HASH, None, Instant::now()));
}
Ok(())
}

View file

@ -21,6 +21,7 @@ use rand::os::OsRng;
use tokio_core::net::TcpStream;
use core::ser::Error;
use core::core::target::Difficulty;
use msg::*;
use types::*;
use protocol::ProtocolV1;
@ -46,7 +47,7 @@ impl Handshake {
/// Handles connecting to a new remote peer, starting the version handshake.
pub fn connect(&self,
height: u64,
total_difficulty: Difficulty,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
// prepare the first part of the hanshake
@ -55,7 +56,7 @@ impl Handshake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
nonce: nonce,
height: height,
total_difficulty: total_difficulty,
sender_addr: SockAddr(conn.local_addr().unwrap()),
receiver_addr: SockAddr(conn.peer_addr().unwrap()),
user_agent: USER_AGENT.to_string(),
@ -76,7 +77,7 @@ impl Handshake {
user_agent: shake.user_agent,
addr: conn.peer_addr().unwrap(),
version: shake.version,
height: shake.height,
total_difficulty: shake.total_difficulty,
};
info!("Connected to peer {:?}", peer_info);
@ -89,7 +90,7 @@ impl Handshake {
/// Handles receiving a connection from a new remote peer that started the
/// version handshake.
pub fn handshake(&self,
height: u64,
total_difficulty: Difficulty,
conn: TcpStream)
-> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonces = self.nonces.clone();
@ -117,13 +118,13 @@ impl Handshake {
user_agent: hand.user_agent,
addr: conn.peer_addr().unwrap(),
version: hand.version,
height: hand.height,
total_difficulty: hand.total_difficulty,
};
// send our reply with our info
let shake = Shake {
version: PROTOCOL_VERSION,
capabilities: FULL_SYNC,
height: height,
total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(),
};
Ok((conn, shake, peer_info))

View file

@ -47,4 +47,4 @@ mod types;
pub use server::{Server, DummyAdapter};
pub use peer::Peer;
pub use types::{P2PConfig, NetAdapter};
pub use types::{P2PConfig, NetAdapter, MAX_LOCATORS, MAX_BLOCK_HEADERS};

View file

@ -21,8 +21,11 @@ use futures::future::{Future, ok};
use tokio_core::net::TcpStream;
use tokio_core::io::{write_all, read_exact};
use core::ser::{self, Writeable, Readable, Writer, Reader};
use core::consensus::MAX_MSG_LEN;
use core::core::BlockHeader;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser::{self, Writeable, Readable, Writer, Reader};
use types::*;
@ -53,6 +56,9 @@ enum_from_primitive! {
Pong,
GetPeerAddrs,
PeerAddrs,
GetHeaders,
Headers,
GetBlock,
Block,
Transaction,
}
@ -177,8 +183,10 @@ pub struct Hand {
pub capabilities: Capabilities,
/// randomly generated for each handshake, helps detect self
pub nonce: u64,
/// current height of the sender, used to check whether sync may be needed
pub height: u64,
/// total difficulty accumulated by the sender, used to check whether sync
/// may
/// be needed
pub total_difficulty: Difficulty,
/// network address of the sender
pub sender_addr: SockAddr,
/// network address of the receiver
@ -192,8 +200,8 @@ impl Writeable for Hand {
ser_multiwrite!(writer,
[write_u32, self.version],
[write_u32, self.capabilities.bits()],
[write_u64, self.nonce],
[write_u64, self.height]);
[write_u64, self.nonce]);
self.total_difficulty.write(writer);
self.sender_addr.write(writer);
self.receiver_addr.write(writer);
writer.write_bytes(&self.user_agent)
@ -202,7 +210,8 @@ impl Writeable for Hand {
impl Readable<Hand> for Hand {
fn read(reader: &mut Reader) -> Result<Hand, ser::Error> {
let (version, capab, nonce, height) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_u64);
let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64);
let total_diff = try!(Difficulty::read(reader));
let sender_addr = try!(SockAddr::read(reader));
let receiver_addr = try!(SockAddr::read(reader));
let ua = try!(reader.read_vec());
@ -212,7 +221,7 @@ impl Readable<Hand> for Hand {
version: version,
capabilities: capabilities,
nonce: nonce,
height: height,
total_difficulty: total_diff,
sender_addr: sender_addr,
receiver_addr: receiver_addr,
user_agent: user_agent,
@ -227,8 +236,10 @@ pub struct Shake {
pub version: u32,
/// sender capabilities
pub capabilities: Capabilities,
/// current height of the sender, used to check whether sync may be needed
pub height: u64,
/// total difficulty accumulated by the sender, used to check whether sync
/// may
/// be needed
pub total_difficulty: Difficulty,
/// name of version of the software
pub user_agent: String,
}
@ -237,22 +248,24 @@ impl Writeable for Shake {
fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> {
ser_multiwrite!(writer,
[write_u32, self.version],
[write_u32, self.capabilities.bits()],
[write_u64, self.height],
[write_bytes, &self.user_agent]);
[write_u32, self.capabilities.bits()]);
self.total_difficulty.write(writer);
writer.write_bytes(&self.user_agent);
Ok(())
}
}
impl Readable<Shake> for Shake {
fn read(reader: &mut Reader) -> Result<Shake, ser::Error> {
let (version, capab, height, ua) = ser_multiread!(reader, read_u32, read_u32, read_u64, read_vec);
let (version, capab) = ser_multiread!(reader, read_u32, read_u32);
let total_diff = try!(Difficulty::read(reader));
let ua = try!(reader.read_vec());
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData));
Ok(Shake {
version: version,
capabilities: capabilities,
height: height,
total_difficulty: total_diff,
user_agent: user_agent,
})
}
@ -387,6 +400,58 @@ impl Readable<SockAddr> for SockAddr {
}
}
/// Serializable wrapper for the block locator.
pub struct Locator {
pub hashes: Vec<Hash>,
}
impl Writeable for Locator {
fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> {
writer.write_u8(self.hashes.len() as u8)?;
for h in &self.hashes {
h.write(writer)?
}
Ok(())
}
}
impl Readable<Locator> for Locator {
fn read(reader: &mut Reader) -> Result<Locator, ser::Error> {
let len = reader.read_u8()?;
let mut hashes = Vec::with_capacity(len as usize);
for _ in 0..len {
hashes.push(Hash::read(reader)?);
}
Ok(Locator { hashes: hashes })
}
}
/// Serializable wrapper for a list of block headers.
pub struct Headers {
pub headers: Vec<BlockHeader>,
}
impl Writeable for Headers {
fn write(&self, writer: &mut Writer) -> Result<(), ser::Error> {
writer.write_u16(self.headers.len() as u16)?;
for h in &self.headers {
h.write(writer)?
}
Ok(())
}
}
impl Readable<Headers> for Headers {
fn read(reader: &mut Reader) -> Result<Headers, ser::Error> {
let len = reader.read_u16()?;
let mut headers = Vec::with_capacity(len as usize);
for _ in 0..len {
headers.push(BlockHeader::read(reader)?);
}
Ok(Headers { headers: headers })
}
}
/// Placeholder for messages like Ping and Pong that don't send anything but
/// the header.
pub struct Empty {}

View file

@ -18,12 +18,14 @@ use futures::Future;
use tokio_core::net::TcpStream;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser::Error;
use handshake::Handshake;
use types::*;
pub struct Peer {
info: PeerInfo,
pub info: PeerInfo,
proto: Box<Protocol>,
}
@ -33,10 +35,10 @@ unsafe impl Send for Peer {}
impl Peer {
/// Initiates the handshake with another peer.
pub fn connect(conn: TcpStream,
height: u64,
total_difficulty: Difficulty,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let connect_peer = hs.connect(height, conn).and_then(|(conn, proto, info)| {
let connect_peer = hs.connect(total_difficulty, conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
@ -48,10 +50,10 @@ impl Peer {
/// Accept a handshake initiated by another peer.
pub fn accept(conn: TcpStream,
height: u64,
total_difficulty: Difficulty,
hs: &Handshake)
-> Box<Future<Item = (TcpStream, Peer), Error = Error>> {
let hs_peer = hs.handshake(height, conn).and_then(|(conn, proto, info)| {
let hs_peer = hs.handshake(total_difficulty, conn).and_then(|(conn, proto, info)| {
Ok((conn,
Peer {
info: info,
@ -68,7 +70,11 @@ impl Peer {
na: Arc<NetAdapter>)
-> Box<Future<Item = (), Error = Error>> {
self.proto.handle(conn, na)
let addr = self.info.addr;
Box::new(self.proto.handle(conn, na).and_then(move |_| {
info!("Client {} disconnected.", addr);
Ok(())
}))
}
/// Bytes sent and received by this peer to the remote peer.
@ -87,6 +93,15 @@ impl Peer {
self.proto.send_block(b)
}
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.proto.send_header_request(locator)
}
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.proto.send_block_request(h)
}
pub fn stop(&self) {
self.proto.close();
}

View file

@ -81,6 +81,14 @@ impl Protocol for ProtocolV1 {
self.send_msg(Type::Transaction, tx)
}
fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), ser::Error> {
self.send_request(Type::GetHeaders, &Locator { hashes: locator }, None)
}
fn send_block_request(&self, h: Hash) -> Result<(), ser::Error> {
self.send_request(Type::GetBlock, &h, Some((Type::Block, h)))
}
/// Close the connection to the remote peer
fn close(&self) {
// TODO some kind of shutdown signal
@ -97,15 +105,7 @@ impl ProtocolV1 {
body: &ser::Writeable,
expect_resp: Option<(Type, Hash)>)
-> Result<(), ser::Error> {
let sent = self.send_msg(t, body);
if let Err(e) = sent {
warn!("Couldn't send message to remote peer: {}", e);
} else if let Some(exp) = expect_resp {
let mut expects = self.expected_responses.lock().unwrap();
expects.push(exp);
}
Ok(())
self.conn.borrow().send_request(t, body, expect_resp)
}
}
@ -116,22 +116,58 @@ fn handle_payload(adapter: &NetAdapter,
-> Result<Option<Hash>, ser::Error> {
match header.msg_type {
Type::Ping => {
let data = try!(ser::ser_vec(&MsgHeader::new(Type::Pong, 0)));
let data = ser::ser_vec(&MsgHeader::new(Type::Pong, 0))?;
sender.send(data);
Ok(None)
}
Type::Pong => Ok(None),
Type::Transaction => {
let tx = try!(ser::deserialize::<core::Transaction>(&mut &buf[..]));
let tx = ser::deserialize::<core::Transaction>(&mut &buf[..])?;
adapter.transaction_received(tx);
Ok(None)
}
Type::GetBlock => {
let h = ser::deserialize::<Hash>(&mut &buf[..])?;
let bo = adapter.get_block(h);
if let Some(b) = bo {
// serialize and send the block over
let mut body_data = vec![];
try!(ser::serialize(&mut body_data, &b));
let mut data = vec![];
try!(ser::serialize(&mut data,
&MsgHeader::new(Type::Block, body_data.len() as u64)));
data.append(&mut body_data);
sender.send(data);
}
Ok(None)
}
Type::Block => {
let b = try!(ser::deserialize::<core::Block>(&mut &buf[..]));
let b = ser::deserialize::<core::Block>(&mut &buf[..])?;
let bh = b.hash();
adapter.block_received(b);
Ok(Some(bh))
}
Type::GetHeaders => {
// load headers from the locator
let loc = ser::deserialize::<Locator>(&mut &buf[..])?;
let headers = adapter.locate_headers(loc.hashes);
// serialize and send all the headers over
let mut body_data = vec![];
try!(ser::serialize(&mut body_data, &Headers { headers: headers }));
let mut data = vec![];
try!(ser::serialize(&mut data,
&MsgHeader::new(Type::Headers, body_data.len() as u64)));
data.append(&mut body_data);
sender.send(data);
Ok(None)
}
Type::Headers => {
let headers = ser::deserialize::<Headers>(&mut &buf[..])?;
adapter.headers_received(headers.headers);
Ok(None)
}
_ => {
debug!("unknown message type {:?}", header.msg_type);
Ok(None)

View file

@ -24,10 +24,13 @@ use std::time::Duration;
use futures;
use futures::{Future, Stream};
use futures::future::IntoFuture;
use rand::{self, Rng};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser::Error;
use handshake::Handshake;
use peer::Peer;
@ -36,9 +39,18 @@ use types::*;
/// A no-op network adapter used for testing.
pub struct DummyAdapter {}
impl NetAdapter for DummyAdapter {
fn height(&self) -> u64 { 0 }
fn total_difficulty(&self) -> Difficulty {
Difficulty::one()
}
fn transaction_received(&self, tx: core::Transaction) {}
fn block_received(&self, b: core::Block) {}
fn headers_received(&self, bh: Vec<core::BlockHeader>) {}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {
vec![]
}
fn get_block(&self, h: Hash) -> Option<core::Block> {
None
}
}
/// P2P server implementation, handling bootstrapping to find and connect to
@ -80,11 +92,11 @@ impl Server {
let hp = h.clone();
let peers = socket.incoming().map_err(|e| Error::IOErr(e)).map(move |(conn, addr)| {
let adapter = adapter.clone();
let height = adapter.height();
let total_diff = adapter.total_difficulty();
let peers = peers.clone();
// accept the peer and add it to the server map
let peer_accept = add_to_peers(peers, Peer::accept(conn, height, &hs.clone()));
let peer_accept = add_to_peers(peers, Peer::accept(conn, total_diff, &hs.clone()));
// wire in a future to timeout the accept after 5 secs
let timed_peer = with_timeout(Box::new(peer_accept), &hp);
@ -132,17 +144,45 @@ impl Server {
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::IOErr(e));
let request = socket.and_then(move |socket| {
let peers = peers.clone();
let height = adapter1.height();
let total_diff = adapter1.total_difficulty();
// connect to the peer and add it to the server map, wiring it a timeout for
// the handhake
let peer_connect = add_to_peers(peers, Peer::connect(socket, height, &Handshake::new()));
let peer_connect =
add_to_peers(peers, Peer::connect(socket, total_diff, &Handshake::new()));
with_timeout(Box::new(peer_connect), &h)
})
.and_then(move |(socket, peer)| peer.run(socket, adapter2));
Box::new(request)
}
/// Returns the peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<Peer>> {
let peers = self.peers.read().unwrap();
if peers.len() == 0 {
return None;
}
let mut res = peers[0].clone();
for p in peers.deref() {
if res.info.total_difficulty < p.info.total_difficulty {
res = (*p).clone();
}
}
Some(res)
}
/// Returns a random peer we're connected to.
pub fn random_peer(&self) -> Option<Arc<Peer>> {
let peers = self.peers.read().unwrap();
if peers.len() == 0 {
None
} else {
let idx = rand::thread_rng().gen_range(0, peers.len());
Some(peers[idx].clone())
}
}
/// Broadcasts the provided block to all our peers. A peer implementation
/// may drop the broadcast request if it knows the remote peer already has
/// the block.
@ -156,7 +196,7 @@ impl Server {
}
/// Number of peers we're currently connected to.
pub fn peers_count(&self) -> u32 {
pub fn peer_count(&self) -> u32 {
self.peers.read().unwrap().len() as u32
}

View file

@ -19,8 +19,19 @@ use futures::Future;
use tokio_core::net::TcpStream;
use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser::Error;
/// Maximum number of hashes in a block header locator request
pub const MAX_LOCATORS: u32 = 10;
/// Maximum number of block headers a peer should ever send
pub const MAX_BLOCK_HEADERS: u32 = 512;
/// Maximum number of block bodies a peer should ever ask for and send
pub const MAX_BLOCK_BODIES: u32 = 16;
/// Configuration for the peer-to-peer server.
#[derive(Debug, Clone, Copy)]
pub struct P2PConfig {
@ -56,7 +67,7 @@ pub struct PeerInfo {
pub user_agent: String,
pub version: u32,
pub addr: SocketAddr,
pub height: u64,
pub total_difficulty: Difficulty,
}
/// A given communication protocol agreed upon between 2 peers (usually
@ -81,6 +92,12 @@ pub trait Protocol {
/// Relays a transaction to the remote peer.
fn send_transaction(&self, tx: &core::Transaction) -> Result<(), Error>;
/// Sends a request for block headers based on the provided block locator.
fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error>;
/// Sends a request for a block from its hash.
fn send_block_request(&self, h: Hash) -> Result<(), Error>;
/// How many bytes have been sent/received to/from the remote peer.
fn transmitted_bytes(&self) -> (u64, u64);
@ -92,12 +109,25 @@ pub trait Protocol {
/// forwarding or querying of blocks and transactions from the network among
/// other things.
pub trait NetAdapter: Sync + Send {
/// Current height of our chain.
fn height(&self) -> u64;
/// Current height of our chain.
fn total_difficulty(&self) -> Difficulty;
/// A valid transaction has been received from one of our peers
fn transaction_received(&self, tx: core::Transaction);
/// A block has been received from one of our peers
fn block_received(&self, b: core::Block);
/// A set of block header has been received, typically in response to a
/// block
/// header request.
fn headers_received(&self, bh: Vec<core::BlockHeader>);
/// Finds a list of block headers based on the provided locator. Tries to
/// identify the common chain and gets the headers that follow it
/// immediately.
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader>;
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block>;
}

View file

@ -27,6 +27,7 @@ use tokio_core::net::TcpStream;
use tokio_core::reactor::{self, Core};
use core::ser;
use core::core::target::Difficulty;
use p2p::Peer;
// Starts a server and connects a client peer to it to check handshake, followed by a ping/pong exchange to make sure the connection is live.
@ -50,7 +51,7 @@ fn peer_handshake() {
let addr = SocketAddr::new(p2p_conf.host, p2p_conf.port);
let socket = TcpStream::connect(&addr, &phandle).map_err(|e| ser::Error::IOErr(e));
socket.and_then(move |socket| {
Peer::connect(socket, 0, &p2p::handshake::Handshake::new())
Peer::connect(socket, Difficulty::one(), &p2p::handshake::Handshake::new())
}).and_then(move |(socket, peer)| {
rhandle.spawn(peer.run(socket, net_adapter.clone()).map_err(|e| {
panic!("Client run failed: {}", e);
@ -63,7 +64,7 @@ fn peer_handshake() {
assert!(recv > 0);
Ok(())
}).and_then(|_| {
assert!(server.peers_count() > 0);
assert!(server.peer_count() > 0);
server.stop();
Ok(())
})