mirror of
https://github.com/mimblewimble/grin.git
synced 2025-02-01 17:01:09 +03:00
* improve: HeaderSync optimization (#1372) * remove get_locator() optimization, which should be an independent pr for security review * refactoring: move 'headers_streaming_body()' from Message to Protocol * move 2 headers utils functions out of Protocol, and remove 'pub' * support reading variable size of BlockHeader, from Cuckoo30 to Cuckoo36 * fix: use global::min_sizeshift() instead of hardcoded 30, because Cuckoo10 will be used for AutomatedTesting chain * fix: should use global::proofsize() instead of hardcoded 42 when calculate serialized_size_of_header * replace another 42 with global::proofsize()
This commit is contained in:
parent
6d992e61d6
commit
d719493483
4 changed files with 180 additions and 15 deletions
|
@ -19,6 +19,7 @@ use chrono::prelude::{DateTime, NaiveDateTime, Utc};
|
|||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::iter::FromIterator;
|
||||
use std::mem;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use consensus::{self, reward, REWARD};
|
||||
|
@ -144,6 +145,39 @@ pub struct BlockHeader {
|
|||
pub pow: Proof,
|
||||
}
|
||||
|
||||
/// Serialized size of fixed part of a BlockHeader, i.e. without pow
|
||||
fn fixed_size_of_serialized_header() -> usize {
|
||||
let mut size: usize = 0;
|
||||
size += mem::size_of::<u16>(); // version
|
||||
size += mem::size_of::<u64>(); // height
|
||||
size += mem::size_of::<Hash>(); // previous
|
||||
size += mem::size_of::<u64>(); // timestamp
|
||||
size += mem::size_of::<Difficulty>(); // total_difficulty
|
||||
size += mem::size_of::<Hash>(); // output_root
|
||||
size += mem::size_of::<Hash>(); // range_proof_root
|
||||
size += mem::size_of::<Hash>(); // kernel_root
|
||||
size += mem::size_of::<BlindingFactor>(); // total_kernel_offset
|
||||
size += mem::size_of::<Commitment>(); // total_kernel_sum
|
||||
size += mem::size_of::<u64>(); // output_mmr_size
|
||||
size += mem::size_of::<u64>(); // kernel_mmr_size
|
||||
size += mem::size_of::<u64>(); // nonce
|
||||
size
|
||||
}
|
||||
|
||||
/// Serialized size of a BlockHeader
|
||||
pub fn serialized_size_of_header(cuckoo_sizeshift: u8) -> usize {
|
||||
let mut size = fixed_size_of_serialized_header();
|
||||
|
||||
size += mem::size_of::<u8>(); // pow.cuckoo_sizeshift
|
||||
let nonce_bits = cuckoo_sizeshift as usize - 1;
|
||||
let bitvec_len = global::proofsize() * nonce_bits;
|
||||
size += bitvec_len / 8; // pow.nonces
|
||||
if bitvec_len % 8 != 0 {
|
||||
size += 1;
|
||||
}
|
||||
size
|
||||
}
|
||||
|
||||
impl Default for BlockHeader {
|
||||
fn default() -> BlockHeader {
|
||||
let proof_size = global::proofsize();
|
||||
|
@ -267,6 +301,20 @@ impl BlockHeader {
|
|||
pub fn total_kernel_offset(&self) -> BlindingFactor {
|
||||
self.total_kernel_offset
|
||||
}
|
||||
|
||||
/// Serialized size of this header
|
||||
pub fn serialized_size(&self) -> usize {
|
||||
let mut size = fixed_size_of_serialized_header();
|
||||
|
||||
size += mem::size_of::<u8>(); // pow.cuckoo_sizeshift
|
||||
let nonce_bits = self.pow.cuckoo_sizeshift as usize - 1;
|
||||
let bitvec_len = global::proofsize() * nonce_bits;
|
||||
size += bitvec_len / 8; // pow.nonces
|
||||
if bitvec_len % 8 != 0 {
|
||||
size += 1;
|
||||
}
|
||||
size
|
||||
}
|
||||
}
|
||||
|
||||
/// A block as expressed in the MimbleWimble protocol. The reward is
|
||||
|
|
|
@ -64,6 +64,11 @@ impl<'a> Message<'a> {
|
|||
Message { header, conn }
|
||||
}
|
||||
|
||||
/// Get the TcpStream
|
||||
pub fn get_conn(&mut self) -> TcpStream {
|
||||
return self.conn.try_clone().unwrap();
|
||||
}
|
||||
|
||||
/// Read the message body from the underlying connection
|
||||
pub fn body<T>(&mut self) -> Result<T, Error>
|
||||
where
|
||||
|
|
|
@ -14,17 +14,17 @@
|
|||
|
||||
use std::env;
|
||||
use std::fs::File;
|
||||
use std::io::BufWriter;
|
||||
use std::net::SocketAddr;
|
||||
use std::io::{self, BufWriter};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conn::{Message, MessageHandler, Response};
|
||||
use core::core;
|
||||
use core::core::hash::Hash;
|
||||
use core::core::CompactBlock;
|
||||
use core::core::{self, hash::Hash, CompactBlock};
|
||||
use core::{global, ser};
|
||||
|
||||
use msg::{
|
||||
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, TxHashSetArchive,
|
||||
TxHashSetRequest, Type,
|
||||
read_exact, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr,
|
||||
TxHashSetArchive, TxHashSetRequest, Type,
|
||||
};
|
||||
use rand::{self, Rng};
|
||||
use types::{Error, NetAdapter};
|
||||
|
@ -169,10 +169,9 @@ impl MessageHandler for Protocol {
|
|||
let headers = adapter.locate_headers(loc.hashes);
|
||||
|
||||
// serialize and send all the headers over
|
||||
Ok(Some(msg.respond(
|
||||
Type::Headers,
|
||||
Headers { headers: headers },
|
||||
)))
|
||||
Ok(Some(
|
||||
msg.respond(Type::Headers, Headers { headers: headers }),
|
||||
))
|
||||
}
|
||||
|
||||
// "header first" block propagation - if we have not yet seen this block
|
||||
|
@ -188,8 +187,23 @@ impl MessageHandler for Protocol {
|
|||
}
|
||||
|
||||
Type::Headers => {
|
||||
let headers: Headers = msg.body()?;
|
||||
let conn = &mut msg.get_conn();
|
||||
|
||||
let header_size: u64 = headers_header_size(conn, msg.header.msg_len)?;
|
||||
let mut total_read: u64 = 2;
|
||||
let mut reserved: Vec<u8> = vec![];
|
||||
|
||||
while total_read < msg.header.msg_len || reserved.len() > 0 {
|
||||
let headers: Headers = headers_streaming_body(
|
||||
conn,
|
||||
msg.header.msg_len,
|
||||
8,
|
||||
&mut total_read,
|
||||
&mut reserved,
|
||||
header_size,
|
||||
)?;
|
||||
adapter.headers_received(headers.headers, self.addr);
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
|
@ -276,7 +290,8 @@ impl MessageHandler for Protocol {
|
|||
);
|
||||
|
||||
let tmp_zip = File::open(tmp)?;
|
||||
let res = self.adapter
|
||||
let res = self
|
||||
.adapter
|
||||
.txhashset_write(sm_arch.hash, tmp_zip, self.addr);
|
||||
|
||||
debug!(
|
||||
|
@ -297,3 +312,100 @@ impl MessageHandler for Protocol {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the Headers Vec size from the underlying connection, and calculate maximum header_size of one Header
|
||||
fn headers_header_size(conn: &mut TcpStream, msg_len: u64) -> Result<u64, Error> {
|
||||
let mut size = vec![0u8; 2];
|
||||
// read size of Vec<BlockHeader>
|
||||
read_exact(conn, &mut size, 20000, true)?;
|
||||
|
||||
let total_headers = size[0] as u64 * 256 + size[1] as u64;
|
||||
if total_headers == 0 || total_headers > 10_000 {
|
||||
return Err(Error::Connection(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"headers_header_size",
|
||||
)));
|
||||
}
|
||||
let average_header_size = (msg_len - 2) / total_headers;
|
||||
|
||||
// support size of Cuckoo: from Cuckoo 30 to Cuckoo 36
|
||||
let minimum_size = core::serialized_size_of_header(global::min_sizeshift());
|
||||
let maximum_size = core::serialized_size_of_header(global::min_sizeshift() + 6);
|
||||
if average_header_size < minimum_size as u64 || average_header_size > maximum_size as u64 {
|
||||
debug!(
|
||||
LOGGER,
|
||||
"headers_header_size - size of Vec: {}, average_header_size: {}, min: {}, max: {}",
|
||||
total_headers,
|
||||
average_header_size,
|
||||
minimum_size,
|
||||
maximum_size,
|
||||
);
|
||||
return Err(Error::Connection(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"headers_header_size",
|
||||
)));
|
||||
}
|
||||
return Ok(maximum_size as u64);
|
||||
}
|
||||
|
||||
/// Read the Headers streaming body from the underlying connection
|
||||
fn headers_streaming_body(
|
||||
conn: &mut TcpStream, // (i) underlying connection
|
||||
msg_len: u64, // (i) length of whole 'Headers'
|
||||
headers_num: u64, // (i) how many BlockHeader(s) do you want to read
|
||||
total_read: &mut u64, // (i/o) how many bytes already read on this 'Headers' message
|
||||
reserved: &mut Vec<u8>, // (i/o) reserved part of previous read, which's not a whole header
|
||||
max_header_size: u64, // (i) maximum possible size of single BlockHeader
|
||||
) -> Result<Headers, Error> {
|
||||
if headers_num == 0 || msg_len < *total_read || *total_read < 2 {
|
||||
return Err(Error::Connection(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"headers_streaming_body",
|
||||
)));
|
||||
}
|
||||
|
||||
// Note:
|
||||
// As we allow Cuckoo sizes greater than 30 now, the proof of work part of the header
|
||||
// could be 30*42 bits, 31*42 bits, 32*42 bits, etc.
|
||||
// So, for compatibility with variable size of block header, we read max possible size, for
|
||||
// up to Cuckoo 36.
|
||||
//
|
||||
let mut read_size = headers_num * max_header_size - reserved.len() as u64;
|
||||
if *total_read + read_size > msg_len {
|
||||
read_size = msg_len - *total_read;
|
||||
}
|
||||
|
||||
// 1st part
|
||||
let mut body = vec![0u8; 2]; // for Vec<> size
|
||||
let mut final_headers_num = (read_size + reserved.len() as u64) / max_header_size;
|
||||
let remaining = msg_len - *total_read - read_size;
|
||||
if final_headers_num == 0 && remaining == 0 {
|
||||
final_headers_num = 1;
|
||||
}
|
||||
body[0] = (final_headers_num >> 8) as u8;
|
||||
body[1] = (final_headers_num & 0x00ff) as u8;
|
||||
|
||||
// 2nd part
|
||||
body.append(reserved);
|
||||
|
||||
// 3rd part
|
||||
let mut read_body = vec![0u8; read_size as usize];
|
||||
if read_size > 0 {
|
||||
read_exact(conn, &mut read_body, 20000, true)?;
|
||||
*total_read += read_size;
|
||||
}
|
||||
body.append(&mut read_body);
|
||||
|
||||
// deserialize these assembled 3 parts
|
||||
let result: Result<Headers, Error> = ser::deserialize(&mut &body[..]).map_err(From::from);
|
||||
let headers = result?;
|
||||
|
||||
// remaining data
|
||||
let mut deserialized_size = 2; // for Vec<> size
|
||||
for header in &headers.headers {
|
||||
deserialized_size += header.serialized_size();
|
||||
}
|
||||
*reserved = body[deserialized_size..].to_vec();
|
||||
|
||||
Ok(headers)
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ pub fn run_sync(
|
|||
sync_state.update(SyncStatus::NoSync);
|
||||
}
|
||||
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
thread::sleep(time::Duration::from_millis(10));
|
||||
|
||||
if stop.load(Ordering::Relaxed) {
|
||||
break;
|
||||
|
|
Loading…
Reference in a new issue