From bf48e523746bd10ca5d986e60eb1b5451dfd5453 Mon Sep 17 00:00:00 2001 From: Yeastplume Date: Thu, 24 Feb 2022 15:20:35 +0000 Subject: [PATCH] [PIBD_IMPL] Update number of simultaneous peer requests for segments (#3696) * cleanup of segment request list * allow for more simultaneous requests during state sync * up number of simultaneous peer requests for segments --- chain/src/txhashset/desegmenter.rs | 34 ++++++++---------- servers/src/common/adapters.rs | 52 +++++++++++++++++----------- servers/src/grin/sync/state_sync.rs | 53 ++++++++++++++--------------- 3 files changed, 72 insertions(+), 67 deletions(-) diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index 43454ed86..837170103 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -467,9 +467,8 @@ impl Desegmenter { } } else { // We have all required bitmap segments and have recreated our local - // bitmap, now continue with other segments - // TODO: Outputs only for now, just for testing. we'll want to evenly spread - // requests among the 3 PMMRs + // bitmap, now continue with other segments, evenly spreading requests + // among MMRs let local_output_mmr_size; let local_kernel_mmr_size; let local_rangeproof_mmr_size; @@ -485,6 +484,7 @@ impl Desegmenter { self.default_output_segment_height, ); + let mut elems_added = 0; while let Some(output_id) = output_identifier_iter.next() { // Advance output iterator to next needed position let (_first, last) = @@ -492,14 +492,11 @@ impl Desegmenter { if last <= local_output_mmr_size { continue; } - // Break if we're full - if return_vec.len() > max_elements { - break; - } - if !self.has_output_segment_with_id(output_id) { return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id)); - // Let other trees have a chance to put in a segment request + elems_added += 1; + } + if elems_added == max_elements / 3 { break; } } @@ -509,20 +506,18 @@ impl Desegmenter { self.default_rangeproof_segment_height, ); + elems_added = 0; while let Some(rp_id) = rangeproof_identifier_iter.next() { let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size); // Advance rangeproof iterator to next needed position if last <= local_rangeproof_mmr_size { continue; } - // Break if we're full - if return_vec.len() > max_elements { - break; - } - if !self.has_rangeproof_segment_with_id(rp_id) { return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id)); - // Let other trees have a chance to put in a segment request + elems_added += 1; + } + if elems_added == max_elements / 3 { break; } } @@ -532,6 +527,7 @@ impl Desegmenter { self.default_kernel_segment_height, ); + elems_added = 0; while let Some(k_id) = kernel_identifier_iter.next() { // Advance kernel iterator to next needed position let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size); @@ -539,13 +535,11 @@ impl Desegmenter { if last <= local_kernel_mmr_size { continue; } - // Break if we're full - if return_vec.len() > max_elements { - break; - } - if !self.has_kernel_segment_with_id(k_id) { return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id)); + elems_added += 1; + } + if elems_added == max_elements / 3 { break; } } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index dcf48aa3f..34d98eb2e 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -577,16 +577,11 @@ where block_hash, output_root ); - // Remove segment from outgoing list TODO: Where is the best place to - // do this? - self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { - segment_type: SegmentType::Bitmap, - identifier: segment.identifier(), - }); // TODO: Entire process needs to be restarted if the horizon block // has changed (perhaps not here, NB this has to go somewhere) let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); + let mut retval = Ok(true); if let Some(d) = self .chain() .desegmenter(&archive_header, self.sync_state.clone())? @@ -599,10 +594,15 @@ where "Validation of incoming bitmap segment failed: {:?}, reason: {}", identifier, e ); - return Err(e); + retval = Err(e); } } - Ok(true) + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Bitmap, + identifier, + }); + retval } fn receive_output_segment( @@ -617,14 +617,9 @@ where block_hash, bitmap_root, ); - // Remove segment from outgoing list TODO: Where is the best place to - // do this? - self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { - segment_type: SegmentType::Output, - identifier: segment.identifier(), - }); let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); + let mut retval = Ok(true); if let Some(d) = self .chain() .desegmenter(&archive_header, self.sync_state.clone())? @@ -637,10 +632,15 @@ where "Validation of incoming output segment failed: {:?}, reason: {}", identifier, e ); - return Err(e); + retval = Err(e); } } - Ok(true) + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Output, + identifier, + }); + retval } fn receive_rangeproof_segment( @@ -655,6 +655,7 @@ where ); let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); + let mut retval = Ok(true); if let Some(d) = self .chain() .desegmenter(&archive_header, self.sync_state.clone())? @@ -667,10 +668,15 @@ where "Validation of incoming rangeproof segment failed: {:?}, reason: {}", identifier, e ); - return Err(e); + retval = Err(e); } } - Ok(true) + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::RangeProof, + identifier, + }); + retval } fn receive_kernel_segment( @@ -685,6 +691,7 @@ where ); let archive_header = self.chain().txhashset_archive_header_header_only()?; let identifier = segment.identifier().clone(); + let mut retval = Ok(true); if let Some(d) = self .chain() .desegmenter(&archive_header, self.sync_state.clone())? @@ -697,10 +704,15 @@ where "Validation of incoming rangeproof segment failed: {:?}, reason: {}", identifier, e ); - return Err(e); + retval = Err(e); } } - Ok(true) + // Remove segment from outgoing list + self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { + segment_type: SegmentType::Kernel, + identifier, + }); + retval } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index 75c0a74bf..73e3cd905 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -243,7 +243,7 @@ impl StateSync { // Figure out the next segments we need // (12 is divisible by 3, to try and evenly spread the requests among the 3 // main pmmrs. Bitmaps segments will always be requested first) - next_segment_ids = d.next_desired_segments(12); + next_segment_ids = d.next_desired_segments(15); } // For each segment, pick a desirable peer and send message @@ -271,34 +271,33 @@ impl StateSync { }); trace!("Chosen peer is {:?}", peer); if let Some(p) = peer { - match seg_id.segment_type { - SegmentType::Bitmap => p - .send_bitmap_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ) - .unwrap(), - SegmentType::Output => p - .send_output_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ) - .unwrap(), - SegmentType::RangeProof => p - .send_rangeproof_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ) - .unwrap(), - SegmentType::Kernel => p - .send_kernel_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ) - .unwrap(), - }; // add to list of segments that are being tracked self.sync_state.add_pibd_segment(seg_id); + let res = match seg_id.segment_type { + SegmentType::Bitmap => p.send_bitmap_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Output => p.send_output_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::RangeProof => p.send_rangeproof_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Kernel => p.send_kernel_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + }; + if let Err(e) = res { + info!( + "Error sending request to peer at {}, reason: {:?}", + p.info.addr, e + ); + self.sync_state.remove_pibd_segment(seg_id); + } } } false