[PIBD_IMPL] Update number of simultaneous peer requests for segments (#3696)

* cleanup of segment request list

* allow for more simultaneous requests during state sync

* up number of simultaneous peer requests for segments
This commit is contained in:
Yeastplume 2022-02-24 15:20:35 +00:00 committed by GitHub
parent 5630cf2e10
commit bf48e52374
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 67 deletions

View file

@ -467,9 +467,8 @@ impl Desegmenter {
}
} else {
// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments
// TODO: Outputs only for now, just for testing. we'll want to evenly spread
// requests among the 3 PMMRs
// bitmap, now continue with other segments, evenly spreading requests
// among MMRs
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
@ -485,6 +484,7 @@ impl Desegmenter {
self.default_output_segment_height,
);
let mut elems_added = 0;
while let Some(output_id) = output_identifier_iter.next() {
// Advance output iterator to next needed position
let (_first, last) =
@ -492,14 +492,11 @@ impl Desegmenter {
if last <= local_output_mmr_size {
continue;
}
// Break if we're full
if return_vec.len() > max_elements {
break;
}
if !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
// Let other trees have a chance to put in a segment request
elems_added += 1;
}
if elems_added == max_elements / 3 {
break;
}
}
@ -509,20 +506,18 @@ impl Desegmenter {
self.default_rangeproof_segment_height,
);
elems_added = 0;
while let Some(rp_id) = rangeproof_identifier_iter.next() {
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
// Advance rangeproof iterator to next needed position
if last <= local_rangeproof_mmr_size {
continue;
}
// Break if we're full
if return_vec.len() > max_elements {
break;
}
if !self.has_rangeproof_segment_with_id(rp_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
// Let other trees have a chance to put in a segment request
elems_added += 1;
}
if elems_added == max_elements / 3 {
break;
}
}
@ -532,6 +527,7 @@ impl Desegmenter {
self.default_kernel_segment_height,
);
elems_added = 0;
while let Some(k_id) = kernel_identifier_iter.next() {
// Advance kernel iterator to next needed position
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
@ -539,13 +535,11 @@ impl Desegmenter {
if last <= local_kernel_mmr_size {
continue;
}
// Break if we're full
if return_vec.len() > max_elements {
break;
}
if !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
if elems_added == max_elements / 3 {
break;
}
}

View file

@ -577,16 +577,11 @@ where
block_hash,
output_root
);
// Remove segment from outgoing list TODO: Where is the best place to
// do this?
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Bitmap,
identifier: segment.identifier(),
});
// TODO: Entire process needs to be restarted if the horizon block
// has changed (perhaps not here, NB this has to go somewhere)
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
@ -599,10 +594,15 @@ where
"Validation of incoming bitmap segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Bitmap,
identifier,
});
retval
}
fn receive_output_segment(
@ -617,14 +617,9 @@ where
block_hash,
bitmap_root,
);
// Remove segment from outgoing list TODO: Where is the best place to
// do this?
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Output,
identifier: segment.identifier(),
});
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
@ -637,10 +632,15 @@ where
"Validation of incoming output segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Output,
identifier,
});
retval
}
fn receive_rangeproof_segment(
@ -655,6 +655,7 @@ where
);
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
@ -667,10 +668,15 @@ where
"Validation of incoming rangeproof segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::RangeProof,
identifier,
});
retval
}
fn receive_kernel_segment(
@ -685,6 +691,7 @@ where
);
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
@ -697,10 +704,15 @@ where
"Validation of incoming rangeproof segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Kernel,
identifier,
});
retval
}
}

View file

@ -243,7 +243,7 @@ impl StateSync {
// Figure out the next segments we need
// (12 is divisible by 3, to try and evenly spread the requests among the 3
// main pmmrs. Bitmaps segments will always be requested first)
next_segment_ids = d.next_desired_segments(12);
next_segment_ids = d.next_desired_segments(15);
}
// For each segment, pick a desirable peer and send message
@ -271,34 +271,33 @@ impl StateSync {
});
trace!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
match seg_id.segment_type {
SegmentType::Bitmap => p
.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::Output => p
.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::RangeProof => p
.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::Kernel => p
.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
};
// add to list of segments that are being tracked
self.sync_state.add_pibd_segment(seg_id);
let res = match seg_id.segment_type {
SegmentType::Bitmap => p.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Output => p.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::RangeProof => p.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Kernel => p.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
};
if let Err(e) = res {
info!(
"Error sending request to peer at {}, reason: {:?}",
p.info.addr, e
);
self.sync_state.remove_pibd_segment(seg_id);
}
}
}
false