Sync improvements ()

* Added additional message type size limits
* Externalize sync loop state, check if we received all
* Adjusted some message sizes, log error when too short
* 4x limits to give ourselves more space
* No need for sync to wait if we have enough peers.
* Tolerate a few less headers before considering we got them all.
This commit is contained in:
Ignotus Peverell 2018-04-03 20:21:13 +00:00 committed by GitHub
parent 0596b753a0
commit 85ee5041aa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 25 deletions

View file

@ -40,19 +40,20 @@ pub fn run_sync(
let _ = thread::Builder::new()
.name("sync".to_string())
.spawn(move || {
let mut prev_body_sync = time::now_utc();
let mut prev_header_sync = prev_body_sync.clone();
let mut prev_fast_sync = prev_body_sync.clone() - time::Duration::seconds(5 * 60);
let mut highest_height = 0;
let mut si = SyncInfo::new();
// initial sleep to give us time to peer with some nodes
if !skip_sync_wait {
awaiting_peers.store(true, Ordering::Relaxed);
thread::sleep(Duration::from_secs(30));
let mut n = 0;
while peers.more_work_peers().len() < 4 && n < 30 {
thread::sleep(Duration::from_secs(1));
n += 1;
}
awaiting_peers.store(false, Ordering::Relaxed);
}
// fast sync has 3 states:
// fast sync has 3 "states":
// * syncing headers
// * once all headers are sync'd, requesting the txhashset state
// * once we have the state, get blocks after that
@ -71,37 +72,31 @@ pub fn run_sync(
if most_work_height > 0 {
// we can occasionally get a most work height of 0 if read locks fail
highest_height = most_work_height;
si.highest_height = most_work_height;
}
// in archival nodes (no fast sync) we just consider we have the whole
// state already, then fast sync triggers if other peers are much
// further ahead
let fast_sync_enabled =
!archive_mode && highest_height.saturating_sub(head.height) > horizon;
!archive_mode && si.highest_height.saturating_sub(head.height) > horizon;
let current_time = time::now_utc();
if syncing {
// run the header sync every 10s
if current_time - prev_header_sync > time::Duration::seconds(10) {
if si.header_sync_due(&header_head) {
header_sync(peers.clone(), chain.clone());
prev_header_sync = current_time;
}
// run the body_sync every 5s
if !fast_sync_enabled
&& current_time - prev_body_sync > time::Duration::seconds(5)
{
if !fast_sync_enabled && si.body_sync_due(&head) {
body_sync(peers.clone(), chain.clone());
prev_body_sync = current_time;
}
// run fast sync if applicable, every 5 min
if fast_sync_enabled && header_head.height == highest_height {
if current_time - prev_fast_sync > time::Duration::seconds(5 * 60) {
fast_sync(peers.clone(), chain.clone(), &header_head);
prev_fast_sync = current_time;
}
if fast_sync_enabled && header_head.height == si.highest_height
&& si.fast_sync_due()
{
fast_sync(peers.clone(), chain.clone(), &header_head);
}
}
currently_syncing.store(syncing, Ordering::Relaxed);
@ -344,6 +339,9 @@ fn get_locator_heights(height: u64) -> Vec<u64> {
let mut heights = vec![];
while current > 0 {
heights.push(current);
if heights.len() >= (p2p::MAX_LOCATORS as usize) - 1 {
break;
}
let next = 2u64.pow(heights.len() as u32);
current = if current > next { current - next } else { 0 }
}
@ -351,6 +349,59 @@ fn get_locator_heights(height: u64) -> Vec<u64> {
heights
}
// Utility struct to group what information the main sync loop has to track
struct SyncInfo {
prev_body_sync: (time::Tm, u64),
prev_header_sync: (time::Tm, u64),
prev_fast_sync: time::Tm,
highest_height: u64,
}
impl SyncInfo {
fn new() -> SyncInfo {
let now = time::now_utc();
SyncInfo {
prev_body_sync: (now.clone(), 0),
prev_header_sync: (now.clone(), 0),
prev_fast_sync: now.clone() - time::Duration::seconds(5 * 60),
highest_height: 0,
}
}
fn header_sync_due(&mut self, header_head: &chain::Tip) -> bool {
let now = time::now_utc();
let (prev_ts, prev_height) = self.prev_header_sync;
if header_head.height >= prev_height + (p2p::MAX_BLOCK_HEADERS as u64) - 4
|| now - prev_ts > time::Duration::seconds(10)
{
self.prev_header_sync = (now, header_head.height);
return true;
}
false
}
fn body_sync_due(&mut self, head: &chain::Tip) -> bool {
let now = time::now_utc();
let (prev_ts, prev_height) = self.prev_body_sync;
if head.height >= prev_height + 96 || now - prev_ts > time::Duration::seconds(5) {
self.prev_body_sync = (now, head.height);
return true;
}
false
}
fn fast_sync_due(&mut self) -> bool {
let now = time::now_utc();
if now - self.prev_fast_sync > time::Duration::seconds(5 * 60) {
self.prev_fast_sync = now;
return true;
}
false
}
}
#[cfg(test)]
mod test {
use super::*;

View file

@ -53,5 +53,5 @@ pub use serv::{DummyAdapter, Server};
pub use peers::Peers;
pub use peer::Peer;
pub use types::{Capabilities, ChainAdapter, Error, P2PConfig, PeerInfo, TxHashSetRead,
MAX_BLOCK_HEADERS, MAX_PEER_ADDRS};
MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS};
pub use store::{PeerData, State};

View file

@ -20,13 +20,14 @@ use std::thread;
use std::time;
use num::FromPrimitive;
use core::consensus::MAX_MSG_LEN;
use core::consensus::{MAX_MSG_LEN, MAX_TX_INPUTS, MAX_TX_KERNELS, MAX_TX_OUTPUTS};
use core::core::BlockHeader;
use core::core::hash::Hash;
use core::core::target::Difficulty;
use core::ser::{self, Readable, Reader, Writeable, Writer};
use types::*;
use util::LOGGER;
/// Current latest version of the protocol
pub const PROTOCOL_VERSION: u32 = 1;
@ -67,10 +68,31 @@ enum_from_primitive! {
StemTransaction,
Transaction,
TxHashSetRequest,
TxHashSetArchive
TxHashSetArchive,
}
}
const MAX_MSG_SIZES: [u64; 18] = [
0, // Error
128, // Hand
88, // Shake
16, // Ping
16, // Pong
4, // GetPeerAddrs
4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64, // PeerAddrs, with all IPv6
1 + 32 * MAX_LOCATORS as u64, // GetHeaders locators
365, // Header
2 + 365 * MAX_BLOCK_HEADERS as u64, // Headers
32, // GetBlock
MAX_MSG_LEN, // Block
32, // GetCompactBlock
MAX_MSG_LEN / 10, // CompactBlock
1000 * MAX_TX_INPUTS + 710 * MAX_TX_OUTPUTS + 114 * MAX_TX_KERNELS, // StemTransaction,
1000 * MAX_TX_INPUTS + 710 * MAX_TX_OUTPUTS + 114 * MAX_TX_KERNELS, // Transaction,
40, // TxHashSetRequest
64, // TxHashSetArchive
];
/// The default implementation of read_exact is useless with async TcpStream as
/// it will return as soon as something has been read, regardless of
/// whether the buffer has been filled (and then errors). This implementation
@ -166,8 +188,13 @@ pub fn read_header(conn: &mut TcpStream) -> Result<MsgHeader, Error> {
let mut head = vec![0u8; HEADER_LEN as usize];
read_exact(conn, &mut head, 10000, false)?;
let header = ser::deserialize::<MsgHeader>(&mut &head[..])?;
if header.msg_len > MAX_MSG_LEN {
// TODO additional restrictions for each msg type to avoid 20MB pings...
let max_len = MAX_MSG_SIZES[header.msg_type as usize];
// TODO 4x the limits for now to leave ourselves space to change things
if header.msg_len > max_len * 4 {
error!(
LOGGER,
"Too large read {}, had {}, wanted {}.", header.msg_type as u8, max_len, header.msg_len
);
return Err(Error::Serialization(ser::Error::TooLargeReadErr));
}
Ok(header)
@ -541,6 +568,9 @@ impl Writeable for Locator {
impl Readable for Locator {
fn read(reader: &mut Reader) -> Result<Locator, ser::Error> {
let len = reader.read_u8()?;
if len > (MAX_LOCATORS as u8) {
return Err(ser::Error::TooLargeReadErr);
}
let mut hashes = Vec::with_capacity(len as usize);
for _ in 0..len {
hashes.push(Hash::read(reader)?);
@ -567,6 +597,9 @@ impl Writeable for Headers {
impl Readable for Headers {
fn read(reader: &mut Reader) -> Result<Headers, ser::Error> {
let len = reader.read_u16()?;
if (len as u32) > MAX_BLOCK_HEADERS + 1 {
return Err(ser::Error::TooLargeReadErr);
}
let mut headers = Vec::with_capacity(len as usize);
for _ in 0..len {
headers.push(BlockHeader::read(reader)?);

View file

@ -34,6 +34,9 @@ pub const MAX_BLOCK_BODIES: u32 = 16;
/// Maximum number of peer addresses a peer should ever send
pub const MAX_PEER_ADDRS: u32 = 256;
/// Maximum number of block header hashes to send as part of a locator
pub const MAX_LOCATORS: u32 = 20;
/// Dandelion relay time
const DANDELION_RELAY_TIME: i64 = 600;