mirror of
https://github.com/mimblewimble/grin.git
synced 2025-04-30 06:11:14 +03:00
Lots of chain sync and block validation fixes
* Fix for the chain pipeline partly relying on an outdated head, leading to not properly recognizing a fork and inconsistent sum tree state. * Do not drop block requests during sync that don't get satisfied, retry enough time to get them and avoid stall. * Always validate header, even in sync where we may have validated it already. We don't want a block coming from a peer that could squeeze through with an invalid header. * When syncing, do not mark blocks that were errored by the chain as received (typical case: orphan). Keep retrying. * Improved chain state dump for debugging. * Do not add to orphans blocks too far in the future. * Better error reporting on db errors. * Related sync test fixes. TODO figure out why syncing peers timeout so often, very useful to test but not that great for a fast sync experience.
This commit is contained in:
parent
f12559f53b
commit
f1488f9529
11 changed files with 183 additions and 143 deletions
chain/src
core/src/core
grin
src/bin
|
@ -96,7 +96,7 @@ impl Chain {
|
|||
info!(LOGGER, "Saved genesis block with hash {}", gen.hash());
|
||||
tip
|
||||
}
|
||||
Err(e) => return Err(Error::StoreErr(e)),
|
||||
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
|
||||
};
|
||||
|
||||
let store = Arc::new(chain_store);
|
||||
|
@ -116,7 +116,8 @@ impl Chain {
|
|||
/// has been added to the longest chain, None if it's added to an (as of
|
||||
/// now) orphan chain.
|
||||
pub fn process_block(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
|
||||
let head = self.store.head().map_err(&Error::StoreErr)?;
|
||||
let head = self.store.head().map_err(|e| Error::StoreErr(e, "chain load head".to_owned()))?;
|
||||
let height = head.height;
|
||||
let ctx = self.ctx_from_head(head, opts);
|
||||
|
||||
let res = pipe::process_block(&b, ctx);
|
||||
|
@ -140,9 +141,11 @@ impl Chain {
|
|||
}
|
||||
Ok(None) => {}
|
||||
Err(Error::Orphan) => {
|
||||
let mut orphans = self.orphans.lock().unwrap();
|
||||
orphans.push_front((opts, b));
|
||||
orphans.truncate(MAX_ORPHANS);
|
||||
if b.header.height < height + (MAX_ORPHANS as u64) {
|
||||
let mut orphans = self.orphans.lock().unwrap();
|
||||
orphans.push_front((opts, b));
|
||||
orphans.truncate(MAX_ORPHANS);
|
||||
}
|
||||
}
|
||||
Err(ref e) => {
|
||||
info!(
|
||||
|
@ -166,7 +169,7 @@ impl Chain {
|
|||
opts: Options,
|
||||
) -> Result<Option<Tip>, Error> {
|
||||
|
||||
let head = self.store.get_header_head().map_err(&Error::StoreErr)?;
|
||||
let head = self.store.get_header_head().map_err(|e| Error::StoreErr(e, "chain header head".to_owned()))?;
|
||||
let ctx = self.ctx_from_head(head, opts);
|
||||
|
||||
pipe::process_block_header(bh, ctx)
|
||||
|
@ -221,8 +224,8 @@ impl Chain {
|
|||
let sumtrees = self.sumtrees.read().unwrap();
|
||||
let is_unspent = sumtrees.is_unspent(output_ref)?;
|
||||
if is_unspent {
|
||||
self.store.get_output_by_commit(output_ref).map_err(
|
||||
&Error::StoreErr,
|
||||
self.store.get_output_by_commit(output_ref).map_err(|e|
|
||||
Error::StoreErr(e, "chain get unspent".to_owned())
|
||||
)
|
||||
} else {
|
||||
Err(Error::OutputNotFound)
|
||||
|
@ -259,23 +262,23 @@ impl Chain {
|
|||
|
||||
/// Block header for the chain head
|
||||
pub fn head_header(&self) -> Result<BlockHeader, Error> {
|
||||
self.store.head_header().map_err(&Error::StoreErr)
|
||||
self.store.head_header().map_err(|e| Error::StoreErr(e, "chain head header".to_owned()))
|
||||
}
|
||||
|
||||
/// Gets a block header by hash
|
||||
pub fn get_block(&self, h: &Hash) -> Result<Block, Error> {
|
||||
self.store.get_block(h).map_err(&Error::StoreErr)
|
||||
self.store.get_block(h).map_err(|e| Error::StoreErr(e, "chain get block".to_owned()))
|
||||
}
|
||||
|
||||
/// Gets a block header by hash
|
||||
pub fn get_block_header(&self, h: &Hash) -> Result<BlockHeader, Error> {
|
||||
self.store.get_block_header(h).map_err(&Error::StoreErr)
|
||||
self.store.get_block_header(h).map_err(|e| Error::StoreErr(e, "chain get header".to_owned()))
|
||||
}
|
||||
|
||||
/// Gets the block header at the provided height
|
||||
pub fn get_header_by_height(&self, height: u64) -> Result<BlockHeader, Error> {
|
||||
self.store.get_header_by_height(height).map_err(
|
||||
&Error::StoreErr,
|
||||
self.store.get_header_by_height(height).map_err(|e|
|
||||
Error::StoreErr(e, "chain get header by height".to_owned()),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -286,12 +289,12 @@ impl Chain {
|
|||
) -> Result<BlockHeader, Error> {
|
||||
self.store
|
||||
.get_block_header_by_output_commit(commit)
|
||||
.map_err(&Error::StoreErr)
|
||||
.map_err(|e| Error::StoreErr(e, "chain get commitment".to_owned()))
|
||||
}
|
||||
|
||||
/// Get the tip of the header chain
|
||||
pub fn get_header_head(&self) -> Result<Tip, Error> {
|
||||
self.store.get_header_head().map_err(&Error::StoreErr)
|
||||
self.store.get_header_head().map_err(|e |Error::StoreErr(e, "chain get header head".to_owned()))
|
||||
}
|
||||
|
||||
/// Builds an iterator on blocks starting from the current chain head and
|
||||
|
|
|
@ -63,10 +63,16 @@ pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result<Option<Tip>, Er
|
|||
|
||||
validate_header(&b.header, &mut ctx)?;
|
||||
|
||||
// take the lock on the sum trees and start a chain extension unit of work
|
||||
// dependent on the success of the internal validation and saving operations
|
||||
// valid header, time to take the lock on the sum trees
|
||||
let local_sumtrees = ctx.sumtrees.clone();
|
||||
let mut sumtrees = local_sumtrees.write().unwrap();
|
||||
|
||||
// update head now that we're in the lock
|
||||
ctx.head = ctx.store.head().
|
||||
map_err(|e| Error::StoreErr(e, "pipe reload head".to_owned()))?;
|
||||
|
||||
// start a chain extension unit of work dependent on the success of the
|
||||
// internal validation and saving operations
|
||||
sumtree::extending(&mut sumtrees, |mut extension| {
|
||||
|
||||
validate_block(b, &mut ctx, &mut extension)?;
|
||||
|
@ -162,8 +168,8 @@ fn validate_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), E
|
|||
}
|
||||
|
||||
// first I/O cost, better as late as possible
|
||||
let prev = try!(ctx.store.get_block_header(&header.previous).map_err(
|
||||
&Error::StoreErr,
|
||||
let prev = try!(ctx.store.get_block_header(&header.previous).map_err(|e|
|
||||
Error::StoreErr(e, format!("previous block header {}", header.previous)),
|
||||
));
|
||||
|
||||
if header.height != prev.height + 1 {
|
||||
|
@ -208,19 +214,6 @@ fn validate_block(
|
|||
let curve = secp::Secp256k1::with_caps(secp::ContextFlag::Commit);
|
||||
try!(b.validate(&curve).map_err(&Error::InvalidBlockProof));
|
||||
|
||||
// check that all the outputs of the block are "new" -
|
||||
// that they do not clobber any existing unspent outputs (by their commitment)
|
||||
//
|
||||
// TODO - do we need to do this here (and can we do this here if we need access
|
||||
// to the chain)
|
||||
// see check_duplicate_outputs in pool for the analogous operation on
|
||||
// transaction outputs
|
||||
// for output in &block.outputs {
|
||||
// here we would check that the output is not a duplicate output based on the
|
||||
// current chain
|
||||
// };
|
||||
|
||||
|
||||
// apply the new block to the MMR trees and check the new root hashes
|
||||
if b.header.previous == ctx.head.last_block_h {
|
||||
// standard head extension
|
||||
|
@ -268,7 +261,7 @@ fn validate_block(
|
|||
kernel_root.hash != b.header.kernel_root
|
||||
{
|
||||
|
||||
ext.dump();
|
||||
ext.dump(false);
|
||||
return Err(Error::InvalidRoot);
|
||||
}
|
||||
|
||||
|
@ -297,14 +290,12 @@ fn validate_block(
|
|||
|
||||
/// Officially adds the block to our chain.
|
||||
fn add_block(b: &Block, ctx: &mut BlockContext) -> Result<(), Error> {
|
||||
ctx.store.save_block(b).map_err(&Error::StoreErr)?;
|
||||
|
||||
Ok(())
|
||||
ctx.store.save_block(b).map_err(|e| Error::StoreErr(e, "pipe save block".to_owned()))
|
||||
}
|
||||
|
||||
/// Officially adds the block header to our header chain.
|
||||
fn add_block_header(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
|
||||
ctx.store.save_block_header(bh).map_err(&Error::StoreErr)
|
||||
ctx.store.save_block_header(bh).map_err(|e| Error::StoreErr(e, "pipe save header".to_owned()))
|
||||
}
|
||||
|
||||
/// Directly updates the head if we've just appended a new block to it or handle
|
||||
|
@ -317,18 +308,16 @@ fn update_head(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, Error>
|
|||
if tip.total_difficulty > ctx.head.total_difficulty {
|
||||
|
||||
// update the block height index
|
||||
ctx.store.setup_height(&b.header).map_err(&Error::StoreErr)?;
|
||||
ctx.store.setup_height(&b.header).map_err(|e| Error::StoreErr(e, "pipe setup height".to_owned()))?;
|
||||
|
||||
// in sync mode, only update the "body chain", otherwise update both the
|
||||
// "header chain" and "body chain", updating the header chain in sync resets
|
||||
// all additional "future" headers we've received
|
||||
if ctx.opts.intersects(SYNC) {
|
||||
ctx.store.save_body_head(&tip).map_err(&Error::StoreErr)?;
|
||||
ctx.store.save_body_head(&tip).map_err(|e| Error::StoreErr(e, "pipe save body".to_owned()))?;
|
||||
} else {
|
||||
ctx.store.save_head(&tip).map_err(&Error::StoreErr)?;
|
||||
ctx.store.save_head(&tip).map_err(|e| Error::StoreErr(e, "pipe save head".to_owned()))?;
|
||||
}
|
||||
|
||||
ctx.store.save_head(&tip).map_err(&Error::StoreErr)?;
|
||||
ctx.head = tip.clone();
|
||||
info!(LOGGER, "Updated head to {} at {}.", b.hash(), b.header.height);
|
||||
Ok(Some(tip))
|
||||
|
@ -345,7 +334,7 @@ fn update_header_head(bh: &BlockHeader, ctx: &mut BlockContext) -> Result<Option
|
|||
// when extending the head), update it
|
||||
let tip = Tip::from_block(bh);
|
||||
if tip.total_difficulty > ctx.head.total_difficulty {
|
||||
ctx.store.save_header_head(&tip).map_err(&Error::StoreErr)?;
|
||||
ctx.store.save_header_head(&tip).map_err(|e| Error::StoreErr(e, "pipe save header head".to_owned()))?;
|
||||
|
||||
ctx.head = tip.clone();
|
||||
info!(
|
||||
|
|
|
@ -29,6 +29,7 @@ use grin_store;
|
|||
use grin_store::sumtree::PMMRBackend;
|
||||
use types::ChainStore;
|
||||
use types::Error;
|
||||
use util::LOGGER;
|
||||
|
||||
const SUMTREES_SUBDIR: &'static str = "sumtrees";
|
||||
const UTXO_SUBDIR: &'static str = "utxo";
|
||||
|
@ -94,7 +95,7 @@ impl SumTrees {
|
|||
match rpos {
|
||||
Ok(pos) => Ok(self.output_pmmr_h.backend.get(pos).is_some()),
|
||||
Err(grin_store::Error::NotFoundErr) => Ok(false),
|
||||
Err(e) => Err(Error::StoreErr(e)),
|
||||
Err(e) => Err(Error::StoreErr(e, "sumtree unspent check".to_owned())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -115,6 +116,7 @@ where
|
|||
let res: Result<T, Error>;
|
||||
let rollback: bool;
|
||||
{
|
||||
debug!(LOGGER, "Starting new sumtree extension.");
|
||||
let commit_index = trees.commit_index.clone();
|
||||
let mut extension = Extension::new(trees, commit_index);
|
||||
res = inner(&mut extension);
|
||||
|
@ -126,6 +128,7 @@ where
|
|||
}
|
||||
match res {
|
||||
Err(e) => {
|
||||
debug!(LOGGER, "Error returned, discarding sumtree extension.");
|
||||
trees.output_pmmr_h.backend.discard();
|
||||
trees.rproof_pmmr_h.backend.discard();
|
||||
trees.kernel_pmmr_h.backend.discard();
|
||||
|
@ -133,10 +136,12 @@ where
|
|||
}
|
||||
Ok(r) => {
|
||||
if rollback {
|
||||
debug!(LOGGER, "Rollbacking sumtree extension.");
|
||||
trees.output_pmmr_h.backend.discard();
|
||||
trees.rproof_pmmr_h.backend.discard();
|
||||
trees.kernel_pmmr_h.backend.discard();
|
||||
} else {
|
||||
debug!(LOGGER, "Committing sumtree extension.");
|
||||
trees.output_pmmr_h.backend.sync()?;
|
||||
trees.rproof_pmmr_h.backend.sync()?;
|
||||
trees.kernel_pmmr_h.backend.sync()?;
|
||||
|
@ -145,6 +150,7 @@ where
|
|||
trees.kernel_pmmr_h.last_pos = sizes.2;
|
||||
}
|
||||
|
||||
debug!(LOGGER, "Sumtree extension done.");
|
||||
Ok(r)
|
||||
}
|
||||
}
|
||||
|
@ -264,6 +270,7 @@ impl<'a> Extension<'a> {
|
|||
let out_pos_rew = self.commit_index.get_output_pos(&output.commitment())?;
|
||||
let kern_pos_rew = self.commit_index.get_kernel_pos(&kernel.excess)?;
|
||||
|
||||
debug!(LOGGER, "Rewind sumtrees to {}", out_pos_rew);
|
||||
self.output_pmmr
|
||||
.rewind(out_pos_rew, height as u32)
|
||||
.map_err(&Error::SumTreeErr)?;
|
||||
|
@ -273,6 +280,7 @@ impl<'a> Extension<'a> {
|
|||
self.kernel_pmmr
|
||||
.rewind(kern_pos_rew, height as u32)
|
||||
.map_err(&Error::SumTreeErr)?;
|
||||
self.dump(true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -293,14 +301,17 @@ impl<'a> Extension<'a> {
|
|||
self.rollback = true;
|
||||
}
|
||||
|
||||
/// Dumps the state of the 3 sum trees to stdout for debugging
|
||||
pub fn dump(&self) {
|
||||
println!("-- outputs --");
|
||||
self.output_pmmr.dump();
|
||||
println!("-- range proofs --");
|
||||
self.rproof_pmmr.dump();
|
||||
println!("-- kernels --");
|
||||
self.kernel_pmmr.dump();
|
||||
/// Dumps the state of the 3 sum trees to stdout for debugging. Short version
|
||||
/// only prints the UTXO tree.
|
||||
pub fn dump(&self, short: bool) {
|
||||
debug!(LOGGER, "-- outputs --");
|
||||
self.output_pmmr.dump(short);
|
||||
if !short {
|
||||
debug!(LOGGER, "-- range proofs --");
|
||||
self.rproof_pmmr.dump(short);
|
||||
debug!(LOGGER, "-- kernels --");
|
||||
self.kernel_pmmr.dump(short);
|
||||
}
|
||||
}
|
||||
|
||||
// Sizes of the sum trees, used by `extending` on rollback.
|
||||
|
|
|
@ -75,7 +75,7 @@ pub enum Error {
|
|||
/// Invalid block version, either a mistake or outdated software
|
||||
InvalidBlockVersion(u16),
|
||||
/// Internal issue when trying to save or load data from store
|
||||
StoreErr(grin_store::Error),
|
||||
StoreErr(grin_store::Error, String),
|
||||
/// Error serializing or deserializing a type
|
||||
SerErr(ser::Error),
|
||||
/// Error while updating the sum trees
|
||||
|
@ -88,7 +88,7 @@ pub enum Error {
|
|||
|
||||
impl From<grin_store::Error> for Error {
|
||||
fn from(e: grin_store::Error) -> Error {
|
||||
Error::StoreErr(e)
|
||||
Error::StoreErr(e, "wrapped".to_owned())
|
||||
}
|
||||
}
|
||||
impl From<ser::Error> for Error {
|
||||
|
|
|
@ -30,7 +30,7 @@ use global;
|
|||
use keychain;
|
||||
|
||||
/// Errors thrown by Block validation
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum Error {
|
||||
/// The sum of output minus input commitments does not match the sum of
|
||||
/// kernel commitments
|
||||
|
|
|
@ -41,6 +41,7 @@ use std::ops::{self, Deref};
|
|||
|
||||
use core::hash::{Hash, Hashed};
|
||||
use ser::{self, Readable, Reader, Writeable, Writer};
|
||||
use util::LOGGER;
|
||||
|
||||
/// Trait for an element of the tree that has a well-defined sum and hash that
|
||||
/// the tree can sum over
|
||||
|
@ -355,25 +356,31 @@ where
|
|||
self.last_pos
|
||||
}
|
||||
|
||||
/// Debugging utility to print information about the MMRs.
|
||||
pub fn dump(&self) {
|
||||
/// Debugging utility to print information about the MMRs. Short version
|
||||
/// only prints the last 8 nodes.
|
||||
pub fn dump(&self, short: bool) {
|
||||
let sz = self.unpruned_size();
|
||||
if sz > 25 {
|
||||
if sz > 600 {
|
||||
return;
|
||||
}
|
||||
println!("UXTO set, size: {}", sz);
|
||||
for n in 0..sz {
|
||||
print!("{:>8} ", n + 1);
|
||||
}
|
||||
println!("");
|
||||
for n in 1..(sz + 1) {
|
||||
let ohs = self.get(n);
|
||||
match ohs {
|
||||
Some(hs) => print!("{} ", hs.hash),
|
||||
None => print!("{:>8} ", "??"),
|
||||
}
|
||||
}
|
||||
println!("");
|
||||
let start = if short && sz > 7 { sz/8 - 1 } else { 0 };
|
||||
for n in start..(sz/8+1) {
|
||||
let mut idx = "".to_owned();
|
||||
let mut hashes = "".to_owned();
|
||||
for m in (n*8)..(n+1)*8 {
|
||||
if m >= sz {
|
||||
break;
|
||||
}
|
||||
idx.push_str(&format!("{:>8} ", m + 1));
|
||||
let ohs = self.get(m+1);
|
||||
match ohs {
|
||||
Some(hs) => hashes.push_str(&format!("{} ", hs.hash)),
|
||||
None => hashes.push_str(&format!("{:>8} ", "??")),
|
||||
}
|
||||
}
|
||||
debug!(LOGGER, "{}", idx);
|
||||
debug!(LOGGER, "{}", hashes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -68,13 +68,17 @@ impl NetAdapter for NetToChainAdapter {
|
|||
// pushing the new block through the chain pipeline
|
||||
let res = self.chain.process_block(b, self.chain_opts());
|
||||
|
||||
if let Err(e) = res {
|
||||
if let &Err(ref e) = &res {
|
||||
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
|
||||
}
|
||||
|
||||
if self.syncing() {
|
||||
self.syncer.borrow().block_received(bhash);
|
||||
}
|
||||
}
|
||||
|
||||
if self.syncing() {
|
||||
match res {
|
||||
Ok(_) => self.syncer.borrow().block_received(bhash),
|
||||
Err(chain::Error::Unfit(_)) => self.syncer.borrow().block_received(bhash),
|
||||
Err(_) => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn headers_received(&self, bhs: Vec<core::BlockHeader>) {
|
||||
|
@ -95,11 +99,12 @@ impl NetAdapter for NetToChainAdapter {
|
|||
s
|
||||
);
|
||||
}
|
||||
Err(chain::Error::StoreErr(e)) => {
|
||||
Err(chain::Error::StoreErr(e, explanation)) => {
|
||||
error!(
|
||||
LOGGER,
|
||||
"Store error processing block header {}: {:?}",
|
||||
"Store error processing block header {}: in {} {:?}",
|
||||
bh.hash(),
|
||||
explanation,
|
||||
e
|
||||
);
|
||||
return;
|
||||
|
@ -130,7 +135,7 @@ impl NetAdapter for NetToChainAdapter {
|
|||
let known = self.chain.get_block_header(&locator[0]);
|
||||
let header = match known {
|
||||
Ok(header) => header,
|
||||
Err(chain::Error::StoreErr(store::Error::NotFoundErr)) => {
|
||||
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => {
|
||||
return self.locate_headers(locator[1..].to_vec());
|
||||
}
|
||||
Err(e) => {
|
||||
|
@ -146,7 +151,7 @@ impl NetAdapter for NetToChainAdapter {
|
|||
let header = self.chain.get_header_by_height(h);
|
||||
match header {
|
||||
Ok(head) => headers.push(head),
|
||||
Err(chain::Error::StoreErr(store::Error::NotFoundErr)) => break,
|
||||
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => break,
|
||||
Err(e) => {
|
||||
error!(LOGGER, "Could not build header locator: {:?}", e);
|
||||
return vec![];
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
/// How many block bodies to download in parallel
|
||||
const MAX_BODY_DOWNLOADS: usize = 8;
|
||||
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::{Instant, Duration};
|
||||
|
@ -30,6 +31,16 @@ use p2p;
|
|||
use types::Error;
|
||||
use util::LOGGER;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BlockDownload {
|
||||
hash: Hash,
|
||||
start_time: Instant,
|
||||
retries: u8,
|
||||
}
|
||||
|
||||
/// Manages syncing the local chain with other peers. Needs both a head chain
|
||||
/// and a full block chain to operate. First tries to advance the header
|
||||
/// chain as much as possible, then downloads the full blocks by batches.
|
||||
pub struct Syncer {
|
||||
chain: Arc<chain::Chain>,
|
||||
p2p: Arc<p2p::Server>,
|
||||
|
@ -37,7 +48,7 @@ pub struct Syncer {
|
|||
sync: Mutex<bool>,
|
||||
last_header_req: Mutex<Instant>,
|
||||
blocks_to_download: Mutex<Vec<Hash>>,
|
||||
blocks_downloading: Mutex<Vec<(Hash, Instant)>>,
|
||||
blocks_downloading: Mutex<Vec<BlockDownload>>,
|
||||
}
|
||||
|
||||
impl Syncer {
|
||||
|
@ -82,6 +93,7 @@ impl Syncer {
|
|||
let tip = self.chain.get_header_head()?;
|
||||
// TODO do something better (like trying to get more) if we lose peers
|
||||
let peer = self.p2p.most_work_peer().unwrap();
|
||||
debug!(LOGGER, "Sync: peer {} vs us {}", peer.info.total_difficulty, tip.total_difficulty);
|
||||
|
||||
let more_headers = peer.info.total_difficulty > tip.total_difficulty;
|
||||
let more_bodies = {
|
||||
|
@ -92,7 +104,7 @@ impl Syncer {
|
|||
|
||||
{
|
||||
let last_header_req = self.last_header_req.lock().unwrap().clone();
|
||||
if more_headers && (Instant::now() - Duration::from_secs(2) > last_header_req) {
|
||||
if more_headers || (Instant::now() - Duration::from_secs(30) > last_header_req) {
|
||||
self.request_headers()?;
|
||||
}
|
||||
}
|
||||
|
@ -147,46 +159,48 @@ impl Syncer {
|
|||
let mut blocks_to_download = self.blocks_to_download.lock().unwrap();
|
||||
let mut blocks_downloading = self.blocks_downloading.lock().unwrap();
|
||||
|
||||
// clean up potentially dead downloads
|
||||
let twenty_sec_ago = Instant::now() - Duration::from_secs(20);
|
||||
let too_old_pos = (0..blocks_downloading.len())
|
||||
.filter(|p| blocks_downloading[*p].1 < twenty_sec_ago)
|
||||
.collect::<Vec<_>>();
|
||||
let mut offs = 0;
|
||||
for too_old in too_old_pos {
|
||||
let block_h = blocks_downloading.remove(too_old - offs);
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Download request expired for {}, will re-issue.",
|
||||
block_h.0
|
||||
);
|
||||
blocks_to_download.push(block_h.0);
|
||||
offs += 1;
|
||||
}
|
||||
// retry blocks not downloading
|
||||
let now = Instant::now();
|
||||
for mut download in blocks_downloading.deref_mut() {
|
||||
let elapsed = (now - download.start_time).as_secs();
|
||||
if download.retries >= 8 {
|
||||
panic!("Failed to download required block {}", download.hash);
|
||||
}
|
||||
if download.retries < (elapsed / 5) as u8 {
|
||||
debug!(LOGGER, "Retry {} on block {}", download.retries, download.hash);
|
||||
self.request_block(download.hash);
|
||||
download.retries += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// consume hashes from blocks to download, place them in downloading and
|
||||
// request them from the network
|
||||
let mut count = 0;
|
||||
while blocks_to_download.len() > 0 && blocks_downloading.len() < MAX_BODY_DOWNLOADS {
|
||||
let h = blocks_to_download.pop().unwrap();
|
||||
let peer = self.p2p.random_peer().unwrap();
|
||||
let send_result = peer.send_block_request(h);
|
||||
if let Err(e) = send_result {
|
||||
debug!(LOGGER, "Error requesting block: {:?}", e);
|
||||
}
|
||||
blocks_downloading.push((h, Instant::now()));
|
||||
}
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Requesting full blocks to download, total left: {}.",
|
||||
blocks_to_download.len()
|
||||
);
|
||||
self.request_block(h);
|
||||
count += 1;
|
||||
blocks_downloading.push(
|
||||
BlockDownload {
|
||||
hash: h,
|
||||
start_time: Instant::now(),
|
||||
retries: 0
|
||||
});
|
||||
}
|
||||
debug!(
|
||||
LOGGER,
|
||||
"Requested {} full blocks to download, total left: {}. Current list: {:?}.",
|
||||
count,
|
||||
blocks_to_download.len(),
|
||||
blocks_downloading.deref(),
|
||||
);
|
||||
}
|
||||
|
||||
/// We added a block, clean up the downloading structure
|
||||
pub fn block_received(&self, bh: Hash) {
|
||||
// just clean up the downloading list
|
||||
let mut bds = self.blocks_downloading.lock().unwrap();
|
||||
bds.iter().position(|&h| h.0 == bh).map(|n| bds.remove(n));
|
||||
bds.iter().position(|ref h| h.hash == bh).map(|n| bds.remove(n));
|
||||
}
|
||||
|
||||
/// Request some block headers from a peer to advance us
|
||||
|
@ -243,6 +257,7 @@ impl Syncer {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
heights.append(&mut tail);
|
||||
debug!(LOGGER, "Loc heights: {:?}", heights);
|
||||
|
||||
// Iteratively travel the header chain back from our head and retain the
|
||||
// headers at the wanted heights.
|
||||
|
@ -259,4 +274,14 @@ impl Syncer {
|
|||
}
|
||||
Ok(locator)
|
||||
}
|
||||
|
||||
/// Pick a random peer and ask for a block by hash
|
||||
fn request_block(&self, h: Hash) {
|
||||
let peer = self.p2p.random_peer().unwrap();
|
||||
let send_result = peer.send_block_request(h);
|
||||
if let Err(e) = send_result {
|
||||
debug!(LOGGER, "Error requesting block: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -68,6 +68,8 @@ pub enum Seeding {
|
|||
List,
|
||||
/// Automatically download a text file with a list of server addresses
|
||||
WebStatic,
|
||||
/// Mostly for tests, where connections are initiated programmatically
|
||||
Programmatic,
|
||||
}
|
||||
|
||||
/// Full server configuration, aggregating configurations required for the
|
||||
|
|
|
@ -19,6 +19,7 @@ extern crate grin_chain as chain;
|
|||
extern crate grin_api as api;
|
||||
extern crate grin_wallet as wallet;
|
||||
extern crate grin_pow as pow;
|
||||
extern crate grin_util as util;
|
||||
extern crate secp256k1zkp as secp;
|
||||
|
||||
extern crate futures;
|
||||
|
@ -250,13 +251,11 @@ fn a_simulate_block_propagation() {
|
|||
}));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/// Creates 2 different disconnected servers, mine a few blocks on one, connect
|
||||
/// them and check that the 2nd gets all the blocks
|
||||
#[test]
|
||||
fn simulate_full_sync() {
|
||||
util::init_test_logger();
|
||||
global::set_mining_mode(MiningParameterMode::AutomatedTesting);
|
||||
|
||||
let test_name_dir = "grin-sync";
|
||||
|
@ -283,17 +282,18 @@ fn simulate_full_sync() {
|
|||
// instantiates 2 servers on different ports
|
||||
let mut servers = vec![];
|
||||
for n in 0..2 {
|
||||
let s = grin::Server::future(
|
||||
grin::ServerConfig {
|
||||
db_root: format!("target/{}/grin-sync-{}", test_name_dir, n),
|
||||
p2p_config: Some(p2p::P2PConfig {
|
||||
port: 11000 + n,
|
||||
..p2p::P2PConfig::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
&handle,
|
||||
).unwrap();
|
||||
let config = grin::ServerConfig {
|
||||
db_root: format!("target/{}/grin-sync-{}", test_name_dir, n),
|
||||
p2p_config: Some(p2p::P2PConfig {
|
||||
port: 11000 + n,
|
||||
..p2p::P2PConfig::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
if n == 1 {
|
||||
config.seeding_type = grin::Seeding::Programmatic;
|
||||
}
|
||||
let s = grin::Server::future(config, &handle).unwrap();
|
||||
servers.push(s);
|
||||
}
|
||||
|
||||
|
|
|
@ -392,11 +392,12 @@ fn wallet_command(wallet_args: &ArgMatches) {
|
|||
.expect("Amount to send required")
|
||||
.parse()
|
||||
.expect("Could not parse amount as a whole number.");
|
||||
let minimum_confirmations: u64 = send_args
|
||||
.value_of("minimum_confirmations")
|
||||
.unwrap_or("1")
|
||||
.parse()
|
||||
.expect("Could not parse minimum_confirmations as a whole number.");
|
||||
let minimum_confirmations: u64 =
|
||||
send_args
|
||||
.value_of("minimum_confirmations")
|
||||
.unwrap_or("1")
|
||||
.parse()
|
||||
.expect("Could not parse minimum_confirmations as a whole number.");
|
||||
let mut dest = "stdout";
|
||||
if let Some(d) = send_args.value_of("dest") {
|
||||
dest = d;
|
||||
|
@ -406,7 +407,7 @@ fn wallet_command(wallet_args: &ArgMatches) {
|
|||
&keychain,
|
||||
amount,
|
||||
minimum_confirmations,
|
||||
dest.to_string()
|
||||
dest.to_string(),
|
||||
).unwrap();
|
||||
}
|
||||
("burn", Some(send_args)) => {
|
||||
|
@ -415,17 +416,14 @@ fn wallet_command(wallet_args: &ArgMatches) {
|
|||
.expect("Amount to burn required")
|
||||
.parse()
|
||||
.expect("Could not parse amount as a whole number.");
|
||||
let minimum_confirmations: u64 = send_args
|
||||
.value_of("minimum_confirmations")
|
||||
.unwrap_or("1")
|
||||
.parse()
|
||||
.expect("Could not parse minimum_confirmations as a whole number.");
|
||||
wallet::issue_burn_tx(
|
||||
&wallet_config,
|
||||
&keychain,
|
||||
amount,
|
||||
minimum_confirmations,
|
||||
).unwrap();
|
||||
let minimum_confirmations: u64 =
|
||||
send_args
|
||||
.value_of("minimum_confirmations")
|
||||
.unwrap_or("1")
|
||||
.parse()
|
||||
.expect("Could not parse minimum_confirmations as a whole number.");
|
||||
wallet::issue_burn_tx(&wallet_config, &keychain, amount, minimum_confirmations)
|
||||
.unwrap();
|
||||
}
|
||||
("info", Some(_)) => {
|
||||
wallet::show_info(&wallet_config, &keychain);
|
||||
|
|
Loading…
Add table
Reference in a new issue