mirror of
https://github.com/mimblewimble/grin.git
synced 2025-05-11 19:51:18 +03:00
Attempt to recover from isolated orphaning (#724)
Due to occasional bad network conditions, a node can miss a block. When the next one is received and detected orphaned, we should at least try a request for the previous block, assuming it's not already an orphan in itself and we're not syncing already. No additinal tests were implemented, test like `simulate_full_sync` test this functionality because sync starts later. It seems to be an issue with sync test coverage. As fix I'd suggest to start sync as soon as we get a peer connection. [https://github.com/mimblewimble/grin/issues/705]
This commit is contained in:
parent
05d1c6c817
commit
452996a421
2 changed files with 63 additions and 37 deletions
|
@ -611,4 +611,9 @@ impl Chain {
|
|||
let head = self.head.lock().unwrap();
|
||||
store::DifficultyIter::from(head.last_block_h, self.store.clone())
|
||||
}
|
||||
|
||||
/// Check whether we have a block without reading it
|
||||
pub fn block_exists(&self, h: Hash) -> Result<bool, Error> {
|
||||
self.store.block_exists(&h).map_err(|e| Error::StoreErr(e, "chain block exists".to_owned()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,9 +88,8 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
b.header.height,
|
||||
addr,
|
||||
);
|
||||
|
||||
self.process_block(b)
|
||||
}
|
||||
self.process_block(b, addr)
|
||||
}
|
||||
|
||||
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
|
||||
let bhash = cb.hash();
|
||||
|
@ -106,7 +105,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
let block = core::Block::hydrate_from(cb, vec![]);
|
||||
|
||||
// push the freshly hydrated block through the chain pipeline
|
||||
self.process_block(block)
|
||||
self.process_block(block, addr)
|
||||
} else {
|
||||
// TODO - do we need to validate the header here?
|
||||
|
||||
|
@ -130,7 +129,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
|
|||
|
||||
if let Ok(()) = block.validate() {
|
||||
debug!(LOGGER, "adapter: successfully hydrated block from tx pool!");
|
||||
self.process_block(block)
|
||||
self.process_block(block, addr)
|
||||
} else {
|
||||
debug!(LOGGER, "adapter: block invalid after hydration, requesting full block");
|
||||
self.request_block(&cb.header, &addr);
|
||||
|
@ -376,20 +375,31 @@ impl NetToChainAdapter {
|
|||
|
||||
// pushing the new block through the chain pipeline
|
||||
// remembering to reset the head if we have a bad block
|
||||
fn process_block(&self, b: core::Block) -> bool {
|
||||
let bhash = b.hash();
|
||||
let chain = w(&self.chain);
|
||||
let res = chain.process_block(b, self.chain_opts());
|
||||
if let Err(ref e) = res {
|
||||
debug!(LOGGER, "Block {} refused by chain: {:?}", bhash, e);
|
||||
if e.is_bad_data() {
|
||||
debug!(LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash);
|
||||
let _ = chain.reset_head();
|
||||
return false;
|
||||
}
|
||||
};
|
||||
true
|
||||
}
|
||||
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool {
|
||||
let prev_hash = b.header.previous;
|
||||
let bhash = b.hash();
|
||||
let chain = w(&self.chain);
|
||||
match chain.process_block(b, self.chain_opts()) {
|
||||
Ok(_) => true,
|
||||
Err(chain::Error::Orphan) => {
|
||||
// make sure we did not miss the parent block
|
||||
if !self.currently_syncing.load(Ordering::Relaxed) && !chain.is_orphan(&prev_hash) {
|
||||
debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash);
|
||||
self.request_block_by_hash(prev_hash, &addr)
|
||||
}
|
||||
true
|
||||
}
|
||||
Err(ref e) if e.is_bad_data() => {
|
||||
debug!(LOGGER, "adapter: process_block: {} is a bad block, resetting head", bhash);
|
||||
let _ = chain.reset_head();
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(LOGGER, "adapter: process_block :block {} refused by chain: {:?}", bhash, e);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// After receiving a compact block if we cannot successfully hydrate
|
||||
// it into a full block then fallback to requesting the full block
|
||||
|
@ -398,17 +408,13 @@ impl NetToChainAdapter {
|
|||
// TODO - currently only request block from a single peer
|
||||
// consider additional peers for redundancy?
|
||||
fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||
if let None = wo(&self.peers).adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = wo(&self.peers).get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_block_request(bh.hash());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(LOGGER, "request_block: block {} already known", bh.hash());
|
||||
}
|
||||
self.request_block_by_hash(bh.hash(), addr)
|
||||
}
|
||||
|
||||
fn request_block_by_hash(&self, h: Hash, addr: &SocketAddr) {
|
||||
self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h))
|
||||
}
|
||||
|
||||
// After we have received a block header in "header first" propagation
|
||||
// we need to go request the block (compact representation) from the
|
||||
// same peer that gave us the header (unless we have already accepted the block)
|
||||
|
@ -416,17 +422,32 @@ impl NetToChainAdapter {
|
|||
// TODO - currently only request block from a single peer
|
||||
// consider additional peers for redundancy?
|
||||
fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
|
||||
if let None = wo(&self.peers).adapter.get_block(bh.hash()) {
|
||||
if let Some(peer) = wo(&self.peers).get_connected_peer(addr) {
|
||||
if let Ok(peer) = peer.read() {
|
||||
let _ = peer.send_compact_block_request(bh.hash());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(LOGGER, "request_compact_block: block {} already known", bh.hash());
|
||||
}
|
||||
self.send_block_request_to_peer(bh.hash(), addr, |peer, h| peer.send_compact_block_request(h))
|
||||
}
|
||||
|
||||
fn send_block_request_to_peer<F>(&self, h: Hash, addr: &SocketAddr, f: F)
|
||||
where F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error> {
|
||||
match w(&self.chain).block_exists(h) {
|
||||
Ok(false) => {
|
||||
match wo(&self.peers).get_connected_peer(addr) {
|
||||
None => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, not connected", addr),
|
||||
Some(peer) => {
|
||||
match peer.read() {
|
||||
Err(e) => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, read fails: {:?}", addr, e),
|
||||
Ok(p) => {
|
||||
if let Err(e) = f(&p, h) {
|
||||
error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(true) => debug!(LOGGER, "send_block_request_to_peer: block {} already known", h),
|
||||
Err(e) => error!(LOGGER, "send_block_request_to_peer: failed to check block exists: {:?}", e)
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare options for the chain pipeline
|
||||
fn chain_opts(&self) -> chain::Options {
|
||||
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
|
||||
|
|
Loading…
Add table
Reference in a new issue