port across "simple_sync" changes from testnet1 branch ()

* port across "simple_sync" changes from testnet1 branch
Cleanup direct refs to peer map or peer store
P2P server acts as a facade, handling the list of connected peers
and the storage of their information. Everything else goes through
the p2p server instead of having a peer map reference or going
straight to the store.
Fix p2p tests

* fix "monitoring peers" log msg (use connected_peers)
This commit is contained in:
AntiochP 2017-11-30 10:27:50 -05:00 committed by GitHub
parent 90a1187bf7
commit 442ef3b255
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 300 additions and 392 deletions

View file

@ -229,12 +229,12 @@ impl Handler for SumTreeHandler {
}
pub struct PeersAllHandler {
pub peer_store: Arc<p2p::PeerStore>,
pub p2p_server: Arc<p2p::Server>,
}
impl Handler for PeersAllHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
let peers = &self.peer_store.all_peers();
let peers = &self.p2p_server.all_peers();
json_response_pretty(&peers)
}
}
@ -246,7 +246,7 @@ pub struct PeersConnectedHandler {
impl Handler for PeersConnectedHandler {
fn handle(&self, _req: &mut Request) -> IronResult<Response> {
let mut peers = vec![];
for p in &self.p2p_server.all_peers() {
for p in &self.p2p_server.connected_peers() {
let p = p.read().unwrap();
let peer_info = p.info.clone();
peers.push(peer_info);
@ -374,7 +374,6 @@ pub fn start_rest_apis<T>(
chain: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<T>>>,
p2p_server: Arc<p2p::Server>,
peer_store: Arc<p2p::PeerStore>,
) where
T: pool::BlockChain + Send + Sync + 'static,
{
@ -396,7 +395,7 @@ pub fn start_rest_apis<T>(
tx_pool: tx_pool.clone(),
};
let peers_all_handler = PeersAllHandler {
peer_store: peer_store.clone(),
p2p_server: p2p_server.clone(),
};
let peers_connected_handler = PeersConnectedHandler {
p2p_server: p2p_server.clone(),

View file

@ -53,7 +53,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
debug!(
LOGGER,
"Processing block {} at {} with {} inputs and {} outputs.",
"pipe: process_block {} at {} with {} inputs and {} outputs.",
b.hash(),
b.header.height,
b.inputs.len(),
@ -78,9 +78,9 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
validate_block(b, &mut ctx, &mut extension)?;
debug!(
LOGGER,
"Block at {} with hash {} is valid, going to save and append.",
"pipe: proces_block {} at {} is valid, save and append.",
b.hash(),
b.header.height,
b.hash()
);
add_block(b, &mut ctx)?;
@ -96,7 +96,7 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
pub fn process_block_header(bh: &BlockHeader, mut ctx: BlockContext) -> Result<Option<Tip>, Error> {
debug!(
LOGGER,
"Processing header {} at {}.",
"pipe: process_header {} at {}",
bh.hash(),
bh.height
);
@ -119,7 +119,7 @@ fn check_known(bh: Hash, ctx: &mut BlockContext) -> Result<(), Error> {
}
if let Ok(b) = ctx.store.get_block(&bh) {
// there is a window where a block can be saved but the chain head not
// updated yet, we plug that window here by re-accepting the block
// updated yet, we plug that window here by re-accepting the block
if b.header.total_difficulty <= ctx.head.total_difficulty {
return Err(Error::Unfit("already in store".to_string()));
}
@ -157,7 +157,7 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E
if !ctx.opts.intersects(SKIP_POW) {
let cycle_size = global::sizeshift();
debug!(LOGGER, "Validating block with cuckoo size {}", cycle_size);
debug!(LOGGER, "pipe: validate_header cuckoo size {}", cycle_size);
if !(ctx.pow_verifier)(header, cycle_size as u32) {
return Err(Error::InvalidPow);
}
@ -220,7 +220,7 @@ fn validate_block(
ext.apply_block(b)?;
} else {
// extending a fork, first identify the block where forking occurred
// keeping the hashes of blocks along the fork
// keeping the hashes of blocks along the fork
let mut current = b.header.previous;
let mut hashes = vec![];
loop {
@ -290,7 +290,7 @@ fn validate_block(
.get_block_header_by_output_commit(&input.commitment())
{
// TODO - make sure we are not off-by-1 here vs. the equivalent tansaction
// validation rule
// validation rule
if b.header.height <= output_header.height + global::coinbase_maturity() {
return Err(Error::ImmatureCoinbase);
}
@ -321,7 +321,7 @@ fn add_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Erro
/// work than the head.
fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
// if we made a fork with more work than the head (which should also be true
// when extending the head), update it
// when extending the head), update it
let tip = Tip::from_block(&b.header);
if tip.total_difficulty > ctx.head.total_difficulty {
// update the block height index
@ -330,8 +330,8 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error>
.map_err(|e| Error::StoreErr(e, "pipe setup height".to_owned()))?;
// in sync mode, only update the "body chain", otherwise update both the
// "header chain" and "body chain", updating the header chain in sync resets
// all additional "future" headers we've received
// "header chain" and "body chain", updating the header chain in sync resets
// all additional "future" headers we've received
if ctx.opts.intersects(SYNC) {
ctx.store
.save_body_head(&tip)
@ -359,7 +359,7 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error>
/// work than the head.
fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
// if we made a fork with more work than the head (which should also be true
// when extending the head), update it
// when extending the head), update it
let tip = Tip::from_block(bh);
if tip.total_difficulty > ctx.head.total_difficulty {
ctx.store
@ -372,7 +372,7 @@ fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option
"Updated block header head to {} at {}.",
bh.hash(),
bh.height
);
);
Ok(Some(tip))
} else {
Ok(None)

View file

@ -12,22 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
use chain::{self, ChainAdapter};
use core::core::{self, Output};
use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
use p2p::{self, NetAdapter, Peer, PeerData, PeerStore, Server, State};
use p2p::{self, NetAdapter, PeerData, State};
use pool;
use util::secp::pedersen::Commitment;
use util::OneTime;
use store;
use sync;
use util::LOGGER;
/// Implementation of the NetAdapter for the blockchain. Gets notified when new
@ -35,10 +33,9 @@ use util::LOGGER;
/// implementations.
pub struct NetToChainAdapter {
chain: Arc<chain::Chain>,
peer_store: Arc<PeerStore>,
connected_peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
p2p_server: OneTime<Arc<p2p::Server>>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
syncer: OneTime<Arc<sync::Syncer>>,
syncing: AtomicBool,
}
impl NetAdapter for NetToChainAdapter {
@ -79,20 +76,14 @@ impl NetAdapter for NetToChainAdapter {
if let &Err(ref e) = &res {
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
}
if self.syncing() {
// always notify the syncer we received a block
// otherwise we jam up the 8 download slots with orphans
debug!(LOGGER, "adapter: notifying syncer: received block {:?}", bhash);
self.syncer.borrow().block_received(bhash);
}
}
fn headers_received(&self, bhs: Vec<core::BlockHeader>) {
fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) {
info!(
LOGGER,
"Received {} block headers",
bhs.len(),
"Received block headers {:?} from {}",
bhs.iter().map(|x| x.hash()).collect::<Vec<_>>(),
addr,
);
// try to add each header to our header chain
@ -133,10 +124,6 @@ impl NetAdapter for NetToChainAdapter {
"Added {} headers to the header chain.",
added_hs.len()
);
if self.syncing() {
self.syncer.borrow().headers_received(added_hs);
}
}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {
@ -207,7 +194,7 @@ impl NetAdapter for NetToChainAdapter {
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: p2p::Capabilities) -> Vec<SocketAddr> {
let peers = self.peer_store
let peers = self.p2p_server.borrow()
.find_peers(State::Healthy, capab, p2p::MAX_PEER_ADDRS as usize);
debug!(LOGGER, "Got {} peer addrs to send.", peers.len());
map_vec!(peers, |p| p.addr)
@ -217,7 +204,7 @@ impl NetAdapter for NetToChainAdapter {
fn peer_addrs_received(&self, peer_addrs: Vec<SocketAddr>) {
debug!(LOGGER, "Received {} peer addrs, saving.", peer_addrs.len());
for pa in peer_addrs {
if let Ok(e) = self.peer_store.exists_peer(pa) {
if let Ok(e) = self.p2p_server.borrow().exists_peer(pa) {
if e {
continue;
}
@ -228,7 +215,7 @@ impl NetAdapter for NetToChainAdapter {
user_agent: "".to_string(),
flags: State::Healthy,
};
if let Err(e) = self.peer_store.save_peer(&peer) {
if let Err(e) = self.p2p_server.borrow().save_peer(&peer) {
error!(LOGGER, "Could not save received peer address: {:?}", e);
}
}
@ -243,7 +230,7 @@ impl NetAdapter for NetToChainAdapter {
user_agent: pi.user_agent.clone(),
flags: State::Healthy,
};
if let Err(e) = self.peer_store.save_peer(&peer) {
if let Err(e) = self.p2p_server.borrow().save_peer(&peer) {
error!(LOGGER, "Could not save connected peer: {:?}", e);
}
}
@ -258,8 +245,7 @@ impl NetAdapter for NetToChainAdapter {
);
if diff.into_num() > 0 {
let peers = self.connected_peers.read().unwrap();
if let Some(peer) = peers.get(&addr) {
if let Some(peer) = self.p2p_server.borrow().get_peer(&addr) {
let mut peer = peer.write().unwrap();
peer.info.total_difficulty = diff;
}
@ -271,40 +257,54 @@ impl NetToChainAdapter {
pub fn new(
chain_ref: Arc<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peer_store: Arc<PeerStore>,
connected_peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
) -> NetToChainAdapter {
NetToChainAdapter {
chain: chain_ref,
peer_store: peer_store,
connected_peers: connected_peers,
p2p_server: OneTime::new(),
tx_pool: tx_pool,
syncer: OneTime::new(),
syncing: AtomicBool::new(true),
}
}
/// Start syncing the chain by instantiating and running the Syncer in the
/// background (a new thread is created).
pub fn start_sync(&self, sync: sync::Syncer) {
let arc_sync = Arc::new(sync);
self.syncer.init(arc_sync.clone());
let _ = thread::Builder::new()
.name("syncer".to_string())
.spawn(move || {
let res = arc_sync.run();
if let Err(e) = res {
panic!("Error during sync, aborting: {:?}", e);
}
});
/// Setup the p2p server on the adapter
pub fn init(&self, p2p: Arc<p2p::Server>) {
self.p2p_server.init(p2p);
}
pub fn syncing(&self) -> bool {
self.syncer.is_initialized() && self.syncer.borrow().syncing()
/// Whether we're currently syncing the chain or we're fully caught up and
/// just receiving blocks through gossip.
pub fn is_syncing(&self) -> bool {
let local_diff = self.total_difficulty();
let peers = self.p2p_server.borrow().connected_peers();
// if we're already syncing, we're caught up if no peer has a higher
// difficulty than us
if self.syncing.load(Ordering::Relaxed) {
let higher_diff = peers.iter().any(|p| {
let p = p.read().unwrap();
p.info.total_difficulty > local_diff
});
if !higher_diff {
info!(LOGGER, "sync: caught up on the most worked chain, disabling sync");
self.syncing.store(false, Ordering::Relaxed);
}
} else {
// if we're not syncing, we need to if our difficulty is much too low
let higher_diff_padded = peers.iter().any(|p| {
let p = p.read().unwrap();
p.info.total_difficulty > local_diff.clone() + Difficulty::from_num(1000)
});
if higher_diff_padded {
info!(LOGGER, "sync: late on the most worked chain, enabling sync");
self.syncing.store(true, Ordering::Relaxed);
}
}
self.syncing.load(Ordering::Relaxed)
}
/// Prepare options for the chain pipeline
fn chain_opts(&self) -> chain::Options {
let opts = if self.syncing() {
let opts = if self.is_syncing() {
chain::SYNC
} else {
chain::NONE
@ -318,7 +318,7 @@ impl NetToChainAdapter {
/// the network to broadcast the block
pub struct ChainToPoolAndNetAdapter {
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
p2p: OneTime<Arc<Server>>,
p2p: OneTime<Arc<p2p::Server>>,
}
impl ChainAdapter for ChainToPoolAndNetAdapter {
@ -346,7 +346,7 @@ impl ChainToPoolAndNetAdapter {
p2p: OneTime::new(),
}
}
pub fn init(&self, p2p: Arc<Server>) {
pub fn init(&self, p2p: Arc<p2p::Server>) {
self.p2p.init(p2p);
}
}
@ -354,7 +354,7 @@ impl ChainToPoolAndNetAdapter {
/// Adapter between the transaction pool and the network, to relay
/// transactions that have been accepted.
pub struct PoolToNetAdapter {
p2p: OneTime<Arc<Server>>,
p2p: OneTime<Arc<p2p::Server>>,
}
impl pool::PoolAdapter for PoolToNetAdapter {
@ -372,7 +372,7 @@ impl PoolToNetAdapter {
}
/// Setup the p2p server on the adapter
pub fn init(&self, p2p: Arc<Server>) {
pub fn init(&self, p2p: Arc<p2p::Server>) {
self.p2p.init(p2p);
}
}

View file

@ -39,7 +39,6 @@ const PEER_PREFERRED_COUNT: u32 = 8;
const SEEDS_URL: &'static str = "http://grin-tech.org/seeds.txt";
pub struct Seeder {
peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>,
capabilities: p2p::Capabilities,
@ -48,11 +47,9 @@ pub struct Seeder {
impl Seeder {
pub fn new(
capabilities: p2p::Capabilities,
peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>,
) -> Seeder {
Seeder {
peer_store: peer_store,
p2p: p2p,
capabilities: capabilities,
}
@ -82,7 +79,6 @@ impl Seeder {
&self,
tx: mpsc::UnboundedSender<SocketAddr>,
) -> Box<Future<Item = (), Error = String>> {
let peer_store = self.peer_store.clone();
let p2p_server = self.p2p.clone();
// now spawn a new future to regularly check if we need to acquire more peers
@ -90,7 +86,7 @@ impl Seeder {
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(10))
.for_each(move |_| {
debug!(LOGGER, "monitoring peers ({})", p2p_server.all_peers().len());
debug!(LOGGER, "monitoring peers ({})", p2p_server.connected_peers().len());
// maintenance step first, clean up p2p server peers and mark bans
// if needed
@ -100,7 +96,7 @@ impl Seeder {
if p.is_banned() {
debug!(LOGGER, "Marking peer {} as banned.", p.info.addr);
let update_result =
peer_store.update_state(p.info.addr, p2p::State::Banned);
p2p_server.update_state(p.info.addr, p2p::State::Banned);
match update_result {
Ok(()) => {}
Err(_) => {}
@ -110,12 +106,12 @@ impl Seeder {
// we don't have enough peers, getting more from db
if p2p_server.peer_count() < PEER_PREFERRED_COUNT {
let mut peers = peer_store.find_peers(
let mut peers = p2p_server.find_peers(
p2p::State::Healthy,
p2p::UNKNOWN,
(2 * PEER_MAX_COUNT) as usize,
);
peers.retain(|p| !p2p_server.is_known(p.addr));
peers.retain(|p| !p2p_server.is_known(&p.addr));
if peers.len() > 0 {
debug!(
LOGGER,
@ -137,21 +133,21 @@ impl Seeder {
}
// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
// otherwise use the seeds provided.
fn connect_to_seeds(
&self,
tx: mpsc::UnboundedSender<SocketAddr>,
seed_list: Box<Future<Item = Vec<SocketAddr>, Error = String>>,
) -> Box<Future<Item = (), Error = String>> {
let peer_store = self.peer_store.clone();
// a thread pool is required so we don't block the event loop with a
// db query
// db query
let thread_pool = cpupool::CpuPool::new(1);
let p2p_server = self.p2p.clone();
let seeder = thread_pool
.spawn_fn(move || {
// check if we have some peers in db
let peers = peer_store.find_peers(
let peers = p2p_server.find_peers(
p2p::State::Healthy,
p2p::FULL_HIST,
(2 * PEER_MAX_COUNT) as usize,
@ -192,7 +188,6 @@ impl Seeder {
rx: mpsc::UnboundedReceiver<SocketAddr>,
) -> Box<Future<Item = (), Error = ()>> {
let capab = self.capabilities;
let p2p_store = self.peer_store.clone();
let p2p_server = self.p2p.clone();
let listener = rx.for_each(move |peer_addr| {
@ -202,7 +197,6 @@ impl Seeder {
h.spawn(
connect_and_req(
capab,
p2p_store.clone(),
p2p_server.clone(),
inner_h,
peer_addr,
@ -271,7 +265,6 @@ pub fn predefined_seeds(
fn connect_and_req(
capab: p2p::Capabilities,
peer_store: Arc<p2p::PeerStore>,
p2p: Arc<p2p::Server>,
h: reactor::Handle,
addr: SocketAddr,
@ -279,6 +272,7 @@ fn connect_and_req(
let connect_peer = p2p.connect_peer(addr, h).map_err(|_| ());
let timer = Timer::default();
let timeout = timer.timeout(connect_peer, Duration::from_secs(5));
let p2p_server = p2p.clone();
let fut = timeout.then(move |p| {
match p {
@ -291,7 +285,7 @@ fn connect_and_req(
}
}
Err(_) => {
let update_result = peer_store.update_state(addr, p2p::State::Defunct);
let update_result = p2p_server.update_state(addr, p2p::State::Defunct);
match update_result {
Ok(()) => {}
Err(_) => {}

View file

@ -16,7 +16,6 @@
//! the peer-to-peer server, the blockchain and the transaction pool) and acts
//! as a facade.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
@ -109,29 +108,23 @@ impl Server {
pool_adapter.set_chain(shared_chain.clone());
// Currently connected peers. Used by both the net_adapter and the p2p_server.
let connected_peers = Arc::new(RwLock::new(HashMap::new()));
let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?);
let net_adapter = Arc::new(NetToChainAdapter::new(
shared_chain.clone(),
tx_pool.clone(),
peer_store.clone(),
connected_peers.clone(),
));
let p2p_server = Arc::new(p2p::Server::new(
config.db_root.clone(),
config.capabilities,
config.p2p_config.unwrap(),
connected_peers.clone(),
net_adapter.clone(),
genesis.hash(),
));
)?);
chain_adapter.init(p2p_server.clone());
pool_net_adapter.init(p2p_server.clone());
net_adapter.init(p2p_server.clone());
let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone());
let seed = seed::Seeder::new(config.capabilities, p2p_server.clone());
match config.seeding_type.clone() {
Seeding::None => {
warn!(
@ -158,11 +151,11 @@ impl Server {
_ => {}
}
// If we have any known seeds or peers then attempt to sync.
if config.seeding_type != Seeding::None || peer_store.all_peers().len() > 0 {
let sync = sync::Syncer::new(shared_chain.clone(), p2p_server.clone());
net_adapter.start_sync(sync);
}
sync::run_sync(
net_adapter.clone(),
p2p_server.clone(),
shared_chain.clone(),
);
evt_handle.spawn(p2p_server.start(evt_handle.clone()).map_err(|_| ()));
@ -173,7 +166,6 @@ impl Server {
shared_chain.clone(),
tx_pool.clone(),
p2p_server.clone(),
peer_store.clone(),
);
warn!(LOGGER, "Grin server started.");
@ -214,8 +206,10 @@ impl Server {
let mut miner = miner::Miner::new(config.clone(), self.chain.clone(), self.tx_pool.clone());
miner.set_debug_output_id(format!("Port {}", self.config.p2p_config.unwrap().port));
thread::spawn(move || {
// TODO push this down in the run loop so miner gets paused anytime we
// decide to sync again
let secs_5 = time::Duration::from_secs(5);
while net_adapter.syncing() {
while net_adapter.is_syncing() {
thread::sleep(secs_5);
}
miner.run_loop(config.clone(), cuckoo_size as u32, proof_size);

View file

@ -1,4 +1,4 @@
// Copyright 2016 The Grin Developers
// Copyright 2017 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,294 +12,182 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Synchronization of the local blockchain with the rest of the network. Used
//! either on a brand new node or when a node is late based on others' heads.
//! Always starts by downloading the header chain before asking either for full
//! blocks or a full UTXO set with related information.
/// How many block bodies to download in parallel
const MAX_BODY_DOWNLOADS: usize = 8;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use std::sync::{Arc, RwLock};
use core::core::hash::{Hash, Hashed};
use adapters::NetToChainAdapter;
use chain;
use p2p;
use core::core::hash::{Hash, Hashed};
use p2p::{self, Peer, NetAdapter};
use types::Error;
use util::LOGGER;
#[derive(Debug)]
struct BlockDownload {
hash: Hash,
start_time: Instant,
retries: u8,
}
/// Manages syncing the local chain with other peers. Needs both a head chain
/// and a full block chain to operate. First tries to advance the header
/// chain as much as possible, then downloads the full blocks by batches.
pub struct Syncer {
/// Starts the syncing loop, just spawns two threads that loop forever
pub fn run_sync(
adapter: Arc<NetToChainAdapter>,
p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>,
p2p: Arc<p2p::Server>,
sync: Mutex<bool>,
last_header_req: Mutex<Instant>,
blocks_to_download: Mutex<Vec<Hash>>,
blocks_downloading: Mutex<Vec<BlockDownload>>,
}
impl Syncer {
pub fn new(chain_ref: Arc<chain::Chain>, p2p: Arc<p2p::Server>) -> Syncer {
Syncer {
chain: chain_ref,
p2p: p2p,
sync: Mutex::new(true),
last_header_req: Mutex::new(Instant::now() - Duration::from_secs(2)),
blocks_to_download: Mutex::new(vec![]),
blocks_downloading: Mutex::new(vec![]),
}
}
pub fn syncing(&self) -> bool {
*self.sync.lock().unwrap()
}
/// Checks the local chain state, comparing it with our peers and triggers
/// syncing if required.
pub fn run(&self) -> Result<(), Error> {
info!(LOGGER, "Sync: starting sync");
// Loop for 10s waiting for some peers to potentially sync from.
let start = Instant::now();
loop {
let pc = self.p2p.peer_count();
if pc > 3 {
break;
}
if Instant::now() - start > Duration::from_secs(10) {
break;
}
thread::sleep(Duration::from_millis(200));
}
// Now check we actually have at least one peer to sync from.
// If not then end the sync cleanly.
if self.p2p.peer_count() == 0 {
info!(LOGGER, "Sync: no peers to sync with, done.");
let mut sync = self.sync.lock().unwrap();
*sync = false;
return Ok(())
}
// check if we have missing full blocks for which we already have a header
self.init_download()?;
// main syncing loop, requests more headers and bodies periodically as long
// as a peer with higher difficulty exists and we're not fully caught up
info!(LOGGER, "Sync: Starting loop.");
loop {
let tip = self.chain.get_header_head()?;
// TODO do something better (like trying to get more) if we lose peers
let peer = self.p2p.most_work_peer().expect("No peers available for sync.");
let peer = peer.read().unwrap();
debug!(
LOGGER,
"Sync: peer {} vs us {}",
peer.info.total_difficulty,
tip.total_difficulty
);
let more_headers = peer.info.total_difficulty > tip.total_difficulty;
let more_bodies = {
let blocks_to_download = self.blocks_to_download.lock().unwrap();
let blocks_downloading = self.blocks_downloading.lock().unwrap();
debug!(
LOGGER,
"Sync: blocks to download {}, block downloading {}",
blocks_to_download.len(),
blocks_downloading.len(),
);
blocks_to_download.len() > 0 || blocks_downloading.len() > 0
};
{
let last_header_req = self.last_header_req.lock().unwrap().clone();
if more_headers || (Instant::now() - Duration::from_secs(30) > last_header_req) {
self.request_headers()?;
) {
let a_inner = adapter.clone();
let p2p_inner = p2p_server.clone();
let c_inner = chain.clone();
let _ = thread::Builder::new()
.name("body_sync".to_string())
.spawn(move || {
loop {
if a_inner.is_syncing() {
body_sync(p2p_inner.clone(), c_inner.clone());
} else {
thread::sleep(Duration::from_secs(5));
}
}
if more_bodies {
self.request_bodies();
});
let _ = thread::Builder::new()
.name("header_sync".to_string())
.spawn(move || {
loop {
if adapter.is_syncing() {
header_sync(adapter.clone(), p2p_server.clone(), chain.clone());
} else {
thread::sleep(Duration::from_secs(5));
}
}
if !more_headers && !more_bodies {
// TODO check we haven't been lied to on the total work
let mut sync = self.sync.lock().unwrap();
*sync = false;
});
}
fn body_sync(
p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>,
) {
debug!(LOGGER, "block_sync: loop");
let header_head = chain.get_header_head().unwrap();
let block_header = chain.head_header().unwrap();
let mut hashes = vec![];
if header_head.total_difficulty > block_header.total_difficulty {
let mut current = chain.get_block_header(&header_head.last_block_h);
while let Ok(header) = current {
if header.hash() == block_header.hash() {
break;
}
thread::sleep(Duration::from_secs(2));
hashes.push(header.hash());
current = chain.get_block_header(&header.previous);
}
info!(LOGGER, "Sync: done.");
Ok(())
}
hashes.reverse();
/// Checks the gap between the header chain and the full block chain and
/// initializes the blocks_to_download structure with the missing full
/// blocks
fn init_download(&self) -> Result<(), Error> {
// compare the header's head to the full one to see what we're missing
let header_head = self.chain.get_header_head()?;
let full_head = self.chain.head()?;
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
// go back the chain and insert for download all blocks we only have the
// head for
let mut prev_h = header_head.last_block_h;
while prev_h != full_head.last_block_h {
let header = self.chain.get_block_header(&prev_h)?;
if header.height < full_head.height {
break;
}
blocks_to_download.push(header.hash());
prev_h = header.previous;
}
let hashes_to_get = hashes
.iter()
.filter(|x| !chain.get_block(&x).is_ok())
.take(10)
.cloned()
.collect::<Vec<_>>();
if hashes_to_get.len() > 0 {
debug!(
LOGGER,
"Sync: Added {} full block hashes to download.",
blocks_to_download.len()
);
Ok(())
}
"block_sync: requesting blocks ({}/{}), {:?}",
block_header.height,
header_head.height,
hashes_to_get,
);
/// Asks for the blocks we haven't downloaded yet and place them in the
/// downloading structure.
fn request_bodies(&self) {
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
let mut blocks_downloading = self.blocks_downloading.lock().unwrap();
// retry blocks not downloading
let now = Instant::now();
for download in blocks_downloading.deref_mut() {
let elapsed = (now - download.start_time).as_secs();
if download.retries >= 12 {
panic!("Failed to download required block {}", download.hash);
}
if download.retries < (elapsed / 5) as u8 {
debug!(
LOGGER,
"Sync: Retry {} on block {}",
download.retries,
download.hash
);
self.request_block(download.hash);
download.retries += 1;
for hash in hashes_to_get.clone() {
let peer = if hashes_to_get.len() < 100 {
p2p_server.most_work_peer()
} else {
p2p_server.random_peer()
};
if let Some(peer) = peer {
let peer = peer.read().unwrap();
if let Err(e) = peer.send_block_request(hash) {
debug!(LOGGER, "block_sync: error requesting block: {:?}, {:?}", hash, e);
}
}
}
// consume hashes from blocks to download, place them in downloading and
// request them from the network
let mut count = 0;
while blocks_to_download.len() > 0 && blocks_downloading.len() < MAX_BODY_DOWNLOADS {
let h = blocks_to_download.pop().unwrap();
self.request_block(h);
count += 1;
blocks_downloading.push(BlockDownload {
hash: h,
start_time: Instant::now(),
retries: 0,
});
}
debug!(
LOGGER,
"Sync: Requested {} full blocks to download, total left: {}. Current list: {:?}.",
count,
blocks_to_download.len(),
blocks_downloading.deref(),
);
thread::sleep(Duration::from_secs(1));
} else {
thread::sleep(Duration::from_secs(5));
}
}
/// We added a block, clean up the downloading structure
pub fn block_received(&self, bh: Hash) {
// just clean up the downloading list
let mut bds = self.blocks_downloading.lock().unwrap();
bds.iter()
.position(|ref h| h.hash == bh)
.map(|n| bds.remove(n));
}
pub fn header_sync(
adapter: Arc<NetToChainAdapter>,
p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>,
) {
debug!(LOGGER, "header_sync: loop");
/// Request some block headers from a peer to advance us
fn request_headers(&self) -> Result<(), Error> {
{
let mut last_header_req = self.last_header_req.lock().unwrap();
*last_header_req = Instant::now();
}
let difficulty = adapter.total_difficulty();
let tip = self.chain.get_header_head()?;
let peer = self.p2p.most_work_peer();
let locator = self.get_locator(&tip)?;
if let Some(p) = peer {
let p = p.read().unwrap();
if let Some(peer) = p2p_server.most_work_peer() {
let peer = peer.clone();
let p = peer.read().unwrap();
let peer_difficulty = p.info.total_difficulty.clone();
if peer_difficulty > difficulty {
debug!(
LOGGER,
"Sync: Asking peer {} for more block headers, locator: {:?}",
p.info.addr,
locator,
);
if let Err(e) = p.send_header_request(locator) {
debug!(LOGGER, "Sync: peer error, will retry");
}
} else {
warn!(LOGGER, "Sync: Could not get most worked peer to request headers.");
}
Ok(())
}
"header_sync: difficulty {} vs {}",
peer_difficulty,
difficulty,
);
/// We added a header, add it to the full block download list
pub fn headers_received(&self, bhs: Vec<Hash>) {
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
for h in bhs {
// enlist for full block download
blocks_to_download.insert(0, h);
}
// we may still have more headers to retrieve but the main loop
// will take care of this for us
}
/// Builds a vector of block hashes that should help the remote peer sending
/// us the right block headers.
fn get_locator(&self, tip: &chain::Tip) -> Result<Vec<Hash>, Error> {
let heights = get_locator_heights(tip.height);
debug!(LOGGER, "Sync: locator heights: {:?}", heights);
let locator = heights
.into_iter()
.map(|h| self.chain.get_header_by_height(h))
.filter(|h| h.is_ok())
.map(|h| h.unwrap().hash())
.collect();
debug!(LOGGER, "Sync: locator: {:?}", locator);
Ok(locator)
}
/// Pick a random peer and ask for a block by hash
fn request_block(&self, h: Hash) {
if let Some(peer) = self.p2p.random_peer() {
let peer = peer.read().unwrap();
if let Err(e) = peer.send_block_request(h) {
debug!(LOGGER, "Sync: Error requesting block: {:?}", e);
}
let _ = request_headers(
peer.clone(),
chain.clone(),
);
}
}
thread::sleep(Duration::from_secs(30));
}
/// Request some block headers from a peer to advance us
fn request_headers(
peer: Arc<RwLock<Peer>>,
chain: Arc<chain::Chain>,
) -> Result<(), Error> {
let locator = get_locator(chain)?;
let peer = peer.read().unwrap();
debug!(
LOGGER,
"Sync: Asking peer {} for more block headers, locator: {:?}",
peer.info.addr,
locator,
);
let _ = peer.send_header_request(locator);
Ok(())
}
fn get_locator(chain: Arc<chain::Chain>) -> Result<Vec<Hash>, Error> {
let tip = chain.get_header_head()?;
// TODO - is this necessary?
// go back to earlier header height to ensure we do not miss a header
let height = if tip.height > 5 {
tip.height - 5
} else {
0
};
let heights = get_locator_heights(height);
debug!(LOGGER, "Sync: locator heights: {:?}", heights);
let mut locator = vec![];
let mut current = chain.get_block_header(&tip.last_block_h);
while let Ok(header) = current {
if heights.contains(&header.height) {
locator.push(header.hash());
}
current = chain.get_block_header(&header.previous);
}
debug!(LOGGER, "Sync: locator: {:?}", locator);
Ok(locator)
}
// current height back to 0 decreasing in powers of 2

View file

@ -55,5 +55,5 @@ mod types;
pub use server::{DummyAdapter, Server};
pub use peer::Peer;
pub use types::{Capabilities, Error, NetAdapter, P2PConfig, PeerInfo, FULL_HIST, FULL_NODE,
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN};
pub use store::{PeerData, PeerStore, State};
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS, UNKNOWN};
pub use store::{PeerData, State};

View file

@ -229,8 +229,8 @@ impl NetAdapter for TrackingAdapter {
self.adapter.block_received(b)
}
fn headers_received(&self, bh: Vec<core::BlockHeader>) {
self.adapter.headers_received(bh)
fn headers_received(&self, bh: Vec<core::BlockHeader>, addr: SocketAddr) {
self.adapter.headers_received(bh, addr)
}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {

View file

@ -211,7 +211,7 @@ fn handle_payload(
}
Type::Headers => {
let headers = ser::deserialize::<Headers>(&mut &buf[..])?;
adapter.headers_received(headers.headers);
adapter.headers_received(headers.headers, addr);
Ok(None)
}
Type::GetPeerAddrs => {

View file

@ -34,6 +34,7 @@ use core::core::hash::Hash;
use core::core::target::Difficulty;
use handshake::Handshake;
use peer::Peer;
use store::{PeerStore, PeerData, State};
use types::*;
use util::LOGGER;
@ -45,7 +46,7 @@ impl NetAdapter for DummyAdapter {
}
fn transaction_received(&self, _: core::Transaction) {}
fn block_received(&self, _: core::Block) {}
fn headers_received(&self, _: Vec<core::BlockHeader>) {}
fn headers_received(&self, _: Vec<core::BlockHeader>, _: SocketAddr) {}
fn locate_headers(&self, _: Vec<Hash>) -> Vec<core::BlockHeader> {
vec![]
}
@ -65,6 +66,7 @@ impl NetAdapter for DummyAdapter {
pub struct Server {
config: P2PConfig,
capabilities: Capabilities,
store: Arc<PeerStore>,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
handshake: Arc<Handshake>,
adapter: Arc<NetAdapter>,
@ -78,20 +80,21 @@ unsafe impl Send for Server {}
impl Server {
/// Creates a new idle p2p server with no peers
pub fn new(
db_root: String,
capab: Capabilities,
config: P2PConfig,
peers: Arc<RwLock<HashMap<SocketAddr, Arc<RwLock<Peer>>>>>,
adapter: Arc<NetAdapter>,
genesis: Hash,
) -> Server {
Server {
) -> Result<Server, Error> {
Ok(Server {
config: config,
capabilities: capab,
peers: peers,
store: Arc::new(PeerStore::new(db_root)?),
peers: Arc::new(RwLock::new(HashMap::new())),
handshake: Arc::new(Handshake::new(genesis)),
adapter: adapter,
stop: RefCell::new(None),
}
})
}
/// Starts the p2p server. Opens a TCP port to allow incoming
@ -185,7 +188,7 @@ impl Server {
addr: SocketAddr,
h: reactor::Handle,
) -> Box<Future<Item = Option<Arc<RwLock<Peer>>>, Error = Error>> {
if let Some(p) = self.get_peer(addr) {
if let Some(p) = self.get_peer(&addr) {
// if we're already connected to the addr, just return the peer
return Box::new(future::ok(Some(p)));
}
@ -229,17 +232,17 @@ impl Server {
}
/// Check if the server already knows this peer (is already connected).
pub fn is_known(&self, addr: SocketAddr) -> bool {
pub fn is_known(&self, addr: &SocketAddr) -> bool {
self.get_peer(addr).is_some()
}
pub fn all_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
pub fn connected_peers(&self) -> Vec<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().values().map(|p| p.clone()).collect()
}
/// Get a peer we're connected to by address.
pub fn get_peer(&self, addr: SocketAddr) -> Option<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().get(&addr).map(|p| p.clone())
pub fn get_peer(&self, addr: &SocketAddr) -> Option<Arc<RwLock<Peer>>> {
self.peers.read().unwrap().get(addr).map(|p| p.clone())
}
/// Have the server iterate over its peer list and prune all peers we have
@ -249,7 +252,7 @@ impl Server {
let mut rm = vec![];
// build a list of peers to be cleaned up
for peer in self.all_peers() {
for peer in self.connected_peers() {
let peer_inner = peer.read().unwrap();
if !peer_inner.is_connected() {
debug!(LOGGER, "cleaning {:?}, not connected", peer_inner.info.addr);
@ -270,7 +273,7 @@ impl Server {
/// Returns the peer with the most worked branch, showing the highest total
/// difficulty.
pub fn most_work_peer(&self) -> Option<Arc<RwLock<Peer>>> {
let mut peers = self.all_peers();
let mut peers = self.connected_peers();
if peers.len() == 0 {
return None;
}
@ -293,7 +296,7 @@ impl Server {
let difficulty = self.adapter.total_difficulty();
let peers = self
.all_peers()
.connected_peers()
.iter()
.filter(|x| {
let peer = x.read().unwrap();
@ -312,7 +315,7 @@ impl Server {
/// may drop the broadcast request if it knows the remote peer already has
/// the block.
pub fn broadcast_block(&self, b: &core::Block) {
let peers = self.all_peers();
let peers = self.connected_peers();
let mut count = 0;
for p in peers {
let p = p.read().unwrap();
@ -331,7 +334,7 @@ impl Server {
/// implementation may drop the broadcast request if it knows the
/// remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let peers = self.all_peers();
let peers = self.connected_peers();
for p in peers {
let p = p.read().unwrap();
if p.is_connected() {
@ -344,19 +347,44 @@ impl Server {
/// Number of peers we're currently connected to.
pub fn peer_count(&self) -> u32 {
self.all_peers().len() as u32
self.connected_peers().len() as u32
}
/// Stops the server. Disconnect from all peers at the same time.
pub fn stop(self) {
info!(LOGGER, "calling stop on server");
let peers = self.all_peers();
let peers = self.connected_peers();
for peer in peers {
let peer = peer.read().unwrap();
peer.stop();
}
self.stop.into_inner().unwrap().send(()).unwrap();
}
/// All peer information we have in storage
pub fn all_peers(&self) -> Vec<PeerData> {
self.store.all_peers()
}
/// Find peers in store (not necessarily connected) and return their data
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
self.store.find_peers(state, cap, count)
}
/// Whether we've already seen a peer with the provided address
pub fn exists_peer(&self, peer_addr: SocketAddr) -> Result<bool, Error> {
self.store.exists_peer(peer_addr).map_err(From::from)
}
/// Saves updated information about a peer
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
self.store.save_peer(p).map_err(From::from)
}
/// Updates the state of a peer in store
pub fn update_state(&self, peer_addr: SocketAddr, new_state: State) -> Result<(), Error> {
self.store.update_state(peer_addr, new_state).map_err(From::from)
}
}
// Adds the peer built by the provided future in the peers map

View file

@ -25,6 +25,7 @@ use core::core;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser;
use grin_store;
/// Maximum number of block headers a peer should ever send
pub const MAX_BLOCK_HEADERS: u32 = 512;
@ -42,6 +43,7 @@ pub enum Error {
Connection(io::Error),
ConnectionClose,
Timeout,
Store(grin_store::Error),
PeerWithSelf,
ProtocolMismatch {
us: u32,
@ -58,6 +60,11 @@ impl From<ser::Error> for Error {
Error::Serialization(e)
}
}
impl From<grin_store::Error> for Error {
fn from(e: grin_store::Error) -> Error {
Error::Store(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Error {
Error::Connection(e)
@ -167,7 +174,7 @@ pub trait NetAdapter: Sync + Send {
/// A set of block header has been received, typically in response to a
/// block
/// header request.
fn headers_received(&self, bh: Vec<core::BlockHeader>);
fn headers_received(&self, bh: Vec<core::BlockHeader>, addr: SocketAddr);
/// Finds a list of block headers based on the provided locator. Tries to
/// identify the common chain and gets the headers that follow it

View file

@ -17,9 +17,8 @@ extern crate grin_core as core;
extern crate grin_p2p as p2p;
extern crate tokio_core;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time;
use futures::future::Future;
@ -38,14 +37,13 @@ fn peer_handshake() {
let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default();
let net_adapter = Arc::new(p2p::DummyAdapter {});
let connected_peers = Arc::new(RwLock::new(HashMap::new()));
let server = p2p::Server::new(
".grin".to_owned(),
p2p::UNKNOWN,
p2p_conf,
connected_peers,
net_adapter.clone(),
Hash::from_vec(vec![]),
);
).unwrap();
let run_server = server.start(handle.clone());
let my_addr = "127.0.0.1:5000".parse().unwrap();