sync_head port from testnet1 ()

* port simple_sync across from testnet1, sync head & improved fork handling
* introduce new sync_head for tracking header chain during sync
* add debug logging in body_sync for state of various heads
* update header_head during sync if header is now the one with most work, pass in sync and head contexts to pipe
* port across sync_head changes from master, put the 1s sleep back in...
* let sync run even if the full 512 headers are all known, give sync a chance to sync against a fork that forked a long time ago
* handle height jumping during a reorg
block validaton should not check for height+1 based on head
no assumptions should be made about height indices
* quick(er) check for previous block in process_block (we cannot check height against ctx head)
* make body_sync a _lot_ faster by finding the forked block more efficiently...
* fix monitoring peers log msg
* fix chain tests
* fix grin tests - we were using the wrong genesis hash (wrong chain type)
* apparently needs setting in both places...
* body -> header -> sync ()
* port over body -> header -> sync changes from testnet1
This commit is contained in:
AntiochP 2017-12-04 14:16:57 -05:00 committed by Ignotus Peverell
parent 364c8cb797
commit cc9ec53390
10 changed files with 200 additions and 94 deletions

View file

@ -25,7 +25,7 @@ use core::core::pmmr::{HashSum, NoSum};
use core::core::{Block, BlockHeader, Output, TxKernel};
use core::core::target::Difficulty;
use core::core::hash::Hash;
use core::core::hash::{Hash, Hashed};
use grin_store::Error::NotFoundErr;
use pipe;
use store;
@ -99,6 +99,18 @@ impl Chain {
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
};
// make sure sync_head is available for later use
let _ = match chain_store.get_sync_head() {
Ok(tip) => tip,
Err(NotFoundErr) => {
let gen = chain_store.get_header_by_height(0).unwrap();
let tip = Tip::new(gen.hash());
chain_store.save_sync_head(&tip)?;
tip
},
Err(e) => return Err(Error::StoreErr(e, "chain init sync head".to_owned())),
};
info!(
LOGGER,
"Chain init: {:?}",
@ -176,19 +188,18 @@ impl Chain {
res
}
/// Attempt to add a new header to the header chain. Only necessary during
/// sync.
pub fn process_block_header(
/// Attempt to add a new header to the header chain.
/// This is only ever used during sync and uses sync_head.
pub fn sync_block_header(
&self,
bh: &BlockHeader,
opts: Options,
) -> Result<Option<Tip>, Error> {
let head = self.store
.get_header_head()
.map_err(|e| Error::StoreErr(e, "chain header head".to_owned()))?;
let ctx = self.ctx_from_head(head, opts);
pipe::process_block_header(bh, ctx)
let sync_head = self.get_sync_head()?;
let header_head = self.get_header_head()?;
let sync_ctx = self.ctx_from_head(sync_head, opts);
let header_ctx = self.ctx_from_head(header_head, opts);
pipe::sync_block_header(bh, sync_ctx, header_ctx)
}
fn ctx_from_head(&self, head: Tip, opts: Options) -> pipe::BlockContext {
@ -350,7 +361,15 @@ impl Chain {
.map_err(|e| Error::StoreErr(e, "chain get commitment".to_owned()))
}
/// Get the tip of the header chain
/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
self.store
.get_sync_head()
.map_err(|e| Error::StoreErr(e, "chain get sync head".to_owned()))
}
/// Get the tip of the header chain.
pub fn get_header_head(&self) -> Result<Tip, Error> {
self.store
.get_header_head()

View file

@ -23,6 +23,7 @@ use core::core::hash::{Hash, Hashed};
use core::core::{Block, BlockHeader};
use core::core::target::Difficulty;
use core::core::transaction;
use grin_store;
use types::*;
use store;
use sumtree;
@ -63,7 +64,21 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
validate_header(&b.header, &mut ctx)?;
// valid header, time to take the lock on the sum trees
// valid header, now check we actually have the previous block in the store
// not just the header but the block itself
// we cannot assume we can use the chain head for this as we may be dealing with a fork
// we cannot use heights here as the fork may have jumped in height
match ctx.store.get_block(&b.header.previous) {
Ok(_) => {},
Err(grin_store::Error::NotFoundErr) => {
return Err(Error::Orphan);
},
Err(e) => {
return Err(Error::StoreErr(e, "pipe get previous".to_owned()));
}
};
// valid header and we have a previous block, time to take the lock on the sum trees
let local_sumtrees = ctx.sumtrees.clone();
let mut sumtrees = local_sumtrees.write().unwrap();
@ -92,22 +107,31 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
})
}
/// Process the block header
pub fn process_block_header(bh: &BlockHeader, mut ctx: BlockContext) -> Result<Option<Tip>, Error> {
/// Process the block header.
/// This is only ever used during sync and uses a context based on sync_head.
pub fn sync_block_header(
bh: &BlockHeader,
mut sync_ctx: BlockContext,
mut header_ctx: BlockContext,
) -> Result<Option<Tip>, Error> {
debug!(
LOGGER,
"pipe: process_header {} at {}",
"pipe: sync_block_header {} at {}",
bh.hash(),
bh.height
);
check_known(bh.hash(), &mut ctx)?;
validate_header(&bh, &mut ctx)?;
add_block_header(bh, &mut ctx)?;
validate_header(&bh, &mut sync_ctx)?;
add_block_header(bh, &mut sync_ctx)?;
// TODO - confirm this is needed during sync process (I don't see how it is)
// we do not touch the sumtrees when syncing headers
// just taking the shared lock
let _ = ctx.sumtrees.write().unwrap();
let _ = header_ctx.sumtrees.write().unwrap();
update_header_head(bh, &mut ctx)
// now update the header_head (if new header with most work) and the sync_head (always)
update_header_head(bh, &mut header_ctx);
update_sync_head(bh, &mut sync_ctx)
}
/// Quick in-memory check to fast-reject any block we've already handled
@ -207,10 +231,6 @@ fn validate_block(
ctx: &mut BlockContext,
ext: &mut sumtree::Extension,
) -> Result<(), Error> {
if b.header.height > ctx.head.height + 1 {
return Err(Error::Orphan);
}
// main isolated block validation, checks all commitment sums and sigs
try!(b.validate().map_err(&Error::InvalidBlockProof));
@ -225,12 +245,22 @@ fn validate_block(
let mut hashes = vec![];
loop {
let curr_header = ctx.store.get_block_header(&current)?;
let height_header = ctx.store.get_header_by_height(curr_header.height)?;
if curr_header.hash() != height_header.hash() {
hashes.insert(0, curr_header.hash());
current = curr_header.previous;
} else {
break;
match ctx.store.get_header_by_height(curr_header.height) {
Ok(height_header) => {
if curr_header.hash() != height_header.hash() {
hashes.insert(0, curr_header.hash());
current = curr_header.previous;
} else {
break;
}
},
Err(grin_store::Error::NotFoundErr) => {
hashes.insert(0, curr_header.hash());
current = curr_header.previous;
},
Err(e) => {
return Err(Error::StoreErr(e, format!("header by height lookup failed")));
}
}
}
@ -354,25 +384,36 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error>
}
}
/// Directly updates the head if we've just appended a new block to it or handle
/// the situation where we've just added enough work to have a fork with more
/// work than the head.
fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
// if we made a fork with more work than the head (which should also be true
// when extending the head), update it
/// Update the sync head so we can keep syncing from where we left off.
fn update_sync_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
let tip = Tip::from_block(bh);
ctx.store
.save_sync_head(&tip)
.map_err(|e| Error::StoreErr(e, "pipe save sync head".to_owned()))?;
ctx.head = tip.clone();
info!(
LOGGER,
"pipe: updated sync head to {} at {}.",
bh.hash(),
bh.height,
);
Ok(Some(tip))
}
fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option<Tip>, Error> {
let tip = Tip::from_block(bh);
debug!(LOGGER, "pipe: update_header_head {},{}", tip.total_difficulty, ctx.head.total_difficulty);
if tip.total_difficulty > ctx.head.total_difficulty {
ctx.store
.save_header_head(&tip)
.map_err(|e| Error::StoreErr(e, "pipe save header head".to_owned()))?;
ctx.head = tip.clone();
info!(
LOGGER,
"Updated block header head to {} at {}.",
"pipe: updated header head to {} at {}.",
bh.hash(),
bh.height
);
bh.height,
);
Ok(Some(tip))
} else {
Ok(None)

View file

@ -31,6 +31,7 @@ const BLOCK_HEADER_PREFIX: u8 = 'h' as u8;
const BLOCK_PREFIX: u8 = 'b' as u8;
const HEAD_PREFIX: u8 = 'H' as u8;
const HEADER_HEAD_PREFIX: u8 = 'I' as u8;
const SYNC_HEAD_PREFIX: u8 = 's' as u8;
const HEADER_HEIGHT_PREFIX: u8 = '8' as u8;
const OUTPUT_COMMIT_PREFIX: u8 = 'o' as u8;
const HEADER_BY_OUTPUT_PREFIX: u8 = 'p' as u8;
@ -80,14 +81,21 @@ impl ChainStore for ChainKVStore {
self.db.put_ser(&vec![HEADER_HEAD_PREFIX], t)
}
fn get_sync_head(&self) -> Result<Tip, Error> {
option_to_not_found(self.db.get_ser(&vec![SYNC_HEAD_PREFIX]))
}
fn save_sync_head(&self, t: &Tip) -> Result<(), Error> {
self.db.put_ser(&vec![SYNC_HEAD_PREFIX], t)
}
fn get_block(&self, h: &Hash) -> Result<Block, Error> {
option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, &mut h.to_vec())))
}
fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
option_to_not_found(
self.db
.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec())),
self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, &mut h.to_vec())),
)
}
@ -210,7 +218,10 @@ impl ChainStore for ChainKVStore {
/// current chain and updating "header_by_height" until we reach a
/// block_header
/// that is consistent with its height (everything prior to this will be
/// consistent)
/// consistent).
/// We need to handle the case where we have no index entry for a given height to
/// account for the case where we just switched to a new fork and the height jumped
/// beyond current chain height.
fn setup_height(&self, bh: &BlockHeader) -> Result<(), Error> {
self.db
.put_ser(&u64_to_key(HEADER_HEIGHT_PREFIX, bh.height), bh)?;
@ -218,25 +229,23 @@ impl ChainStore for ChainKVStore {
return Ok(());
}
let mut prev_h = bh.previous;
let mut prev_height = bh.height - 1;
while prev_height > 0 {
let prev = self.get_header_by_height(prev_height)?;
if prev.hash() != prev_h {
let prev_h = bh.previous;
let prev_height = bh.height - 1;
match self.get_header_by_height(prev_height) {
Ok(prev) => {
if prev.hash() != prev_h {
let real_prev = self.get_block_header(&prev_h)?;
self.setup_height(&real_prev)
} else {
Ok(())
}
},
Err(Error::NotFoundErr) => {
let real_prev = self.get_block_header(&prev_h)?;
self.db
.put_ser(
&u64_to_key(HEADER_HEIGHT_PREFIX, real_prev.height),
&real_prev,
)
.unwrap();
prev_h = real_prev.previous;
prev_height = real_prev.height - 1;
} else {
break;
}
self.setup_height(&real_prev)
},
Err(e) => Err(e)
}
Ok(())
}
}

View file

@ -200,6 +200,12 @@ pub trait ChainStore: Send + Sync {
/// Save the provided tip as the current head of the block header chain
fn save_header_head(&self, t: &Tip) -> Result<(), store::Error>;
/// Get the tip of the current sync header chain
fn get_sync_head(&self) -> Result<Tip, store::Error>;
/// Save the provided tip as the current head of the sync header chain
fn save_sync_head(&self, t: &Tip) -> Result<(), store::Error>;
/// Gets the block header at the provided height
fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, store::Error>;

View file

@ -16,14 +16,17 @@ extern crate env_logger;
extern crate grin_chain as chain;
extern crate grin_core as core;
extern crate grin_keychain as keychain;
extern crate grin_pow as pow;
extern crate rand;
use std::fs;
use chain::ChainStore;
use core::core::hash::Hashed;
use core::core::{Block, BlockHeader};
use core::core::Block;
use keychain::Keychain;
use core::global;
use core::global::ChainTypes;
fn clean_output_dir(dir_name: &str) {
let _ = fs::remove_dir_all(dir_name);
@ -39,7 +42,12 @@ fn test_various_store_indices() {
let chain_store = &chain::store::ChainKVStore::new(".grin".to_string()).unwrap() as &ChainStore;
let block = Block::new(&BlockHeader::default(), vec![], &keychain, &key_id).unwrap();
global::set_mining_mode(ChainTypes::AutomatedTesting);
let genesis = pow::mine_genesis_block(None).unwrap();
chain_store.save_block(&genesis).unwrap();
chain_store.setup_height(&genesis.header).unwrap();
let block = Block::new(&genesis.header, vec![], &keychain, &key_id).unwrap();
let commit = block.outputs[0].commitment();
let block_hash = block.hash();

View file

@ -89,7 +89,7 @@ impl NetAdapter for NetToChainAdapter {
// try to add each header to our header chain
let mut added_hs = vec![];
for bh in bhs {
let res = self.chain.process_block_header(&bh, self.chain_opts());
let res = self.chain.sync_block_header(&bh, self.chain_opts());
match res {
Ok(_) => {
added_hs.push(bh.hash());

View file

@ -86,7 +86,12 @@ impl Seeder {
let mon_loop = Timer::default()
.interval(time::Duration::from_secs(10))
.for_each(move |_| {
debug!(LOGGER, "monitoring peers ({})", p2p_server.connected_peers().len());
debug!(
LOGGER,
"monitoring peers ({} of {})",
p2p_server.connected_peers().len(),
p2p_server.all_peers().len(),
);
// maintenance step first, clean up p2p server peers and mark bans
// if needed

View file

@ -60,18 +60,38 @@ fn body_sync(
p2p_server: Arc<p2p::Server>,
chain: Arc<chain::Chain>,
) {
debug!(LOGGER, "block_sync: loop");
let body_head: chain::Tip = chain.head().unwrap();
let header_head: chain::Tip = chain.get_header_head().unwrap();
let sync_head: chain::Tip = chain.get_sync_head().unwrap();
debug!(
LOGGER,
"body_sync: body_head - {}, {}, header_head - {}, {}, sync_head - {}, {}",
body_head.last_block_h,
body_head.height,
header_head.last_block_h,
header_head.height,
sync_head.last_block_h,
sync_head.height,
);
let header_head = chain.get_header_head().unwrap();
let block_header = chain.head_header().unwrap();
let mut hashes = vec![];
if header_head.total_difficulty > block_header.total_difficulty {
if header_head.total_difficulty > body_head.total_difficulty {
let mut current = chain.get_block_header(&header_head.last_block_h);
while let Ok(header) = current {
if header.hash() == block_header.hash() {
break;
// look back through the sync chain until we find a header
// that is consistent with the height index (we know this is in the real chain)
match chain.get_header_by_height(header.height) {
Ok(height_header) => {
if header.hash() == height_header.hash() {
break;
}
},
Err(_) => {},
}
hashes.push(header.hash());
current = chain.get_block_header(&header.previous);
}
@ -89,12 +109,13 @@ fn body_sync(
debug!(
LOGGER,
"block_sync: requesting blocks ({}/{}), {:?}",
block_header.height,
body_head.height,
header_head.height,
hashes_to_get,
);
for hash in hashes_to_get.clone() {
// TODO - what condition should we choose most_work_peer v random_peer (if any?)
let peer = if hashes_to_get.len() < 100 {
p2p_server.most_work_peer()
} else {
@ -128,24 +149,17 @@ pub fn header_sync(
let peer_difficulty = p.info.total_difficulty.clone();
if peer_difficulty > difficulty {
debug!(
LOGGER,
"header_sync: difficulty {} vs {}",
peer_difficulty,
difficulty,
);
let _ = request_headers(
peer.clone(),
chain.clone(),
);
);
}
}
thread::sleep(Duration::from_secs(30));
thread::sleep(Duration::from_secs(5));
}
/// Request some block headers from a peer to advance us
/// Request some block headers from a peer to advance us.
fn request_headers(
peer: Arc<RwLock<Peer>>,
chain: Arc<chain::Chain>,
@ -154,7 +168,7 @@ fn request_headers(
let peer = peer.read().unwrap();
debug!(
LOGGER,
"Sync: Asking peer {} for more block headers, locator: {:?}",
"sync: asking {} for headers, locator: {:?}",
peer.info.addr,
locator,
);
@ -162,19 +176,14 @@ fn request_headers(
Ok(())
}
/// We build a locator based on sync_head.
/// Even if sync_head is significantly out of date we will "reset" it once we start getting
/// headers back from a peer.
fn get_locator(chain: Arc<chain::Chain>) -> Result<Vec<Hash>, Error> {
let tip = chain.get_header_head()?;
let tip = chain.get_sync_head()?;
let heights = get_locator_heights(tip.height);
// TODO - is this necessary?
// go back to earlier header height to ensure we do not miss a header
let height = if tip.height > 5 {
tip.height - 5
} else {
0
};
let heights = get_locator_heights(height);
debug!(LOGGER, "Sync: locator heights: {:?}", heights);
debug!(LOGGER, "sync: locator heights: {:?}", heights);
let mut locator = vec![];
let mut current = chain.get_block_header(&tip.last_block_h);
@ -185,7 +194,7 @@ fn get_locator(chain: Arc<chain::Chain>) -> Result<Vec<Hash>, Error> {
current = chain.get_block_header(&header.previous);
}
debug!(LOGGER, "Sync: locator: {:?}", locator);
debug!(LOGGER, "sync: locator: {:?}", locator);
Ok(locator)
}

View file

@ -205,6 +205,7 @@ impl LocalServerContainer {
}),
seeds: Some(seeds),
seeding_type: seeding_type,
chain_type: core::global::ChainTypes::AutomatedTesting,
..Default::default()
},
&event_loop.handle(),

View file

@ -187,6 +187,9 @@ fn simulate_parallel_mining() {
#[test]
fn a_simulate_block_propagation() {
util::init_test_logger();
// we actually set the chain_type in the ServerConfig below
// TODO - avoid needing to set it in two places?
global::set_mining_mode(ChainTypes::AutomatedTesting);
let test_name_dir = "grin-prop";
@ -222,6 +225,7 @@ fn a_simulate_block_propagation() {
}),
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:18000".to_string()]),
chain_type: core::global::ChainTypes::AutomatedTesting,
..Default::default()
},
&handle,
@ -246,6 +250,9 @@ fn a_simulate_block_propagation() {
#[test]
fn simulate_full_sync() {
util::init_test_logger();
// we actually set the chain_type in the ServerConfig below
// TODO - avoid needing to set it in two places?
global::set_mining_mode(ChainTypes::AutomatedTesting);
let test_name_dir = "grin-sync";
@ -281,6 +288,7 @@ fn simulate_full_sync() {
}),
seeding_type: grin::Seeding::List,
seeds: Some(vec!["127.0.0.1:11000".to_string()]),
chain_type: core::global::ChainTypes::AutomatedTesting,
..Default::default()
};
let s = grin::Server::future(config, &handle).unwrap();