full node == fast sync (no full archival guarantees) (#1809)

* wip - fast sync only

* wip

* cleanup

* cleanup

* cleanup comments in default config file around capabilities

* fixup p2p tests
This commit is contained in:
Antioch Peverell 2018-10-23 13:01:19 +01:00 committed by GitHub
parent a4a4c5610f
commit 38cbd6eafb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 84 additions and 175 deletions

View file

@ -729,30 +729,11 @@ impl Chain {
Ok(()) Ok(())
} }
/// Triggers chain compaction, cleaning up some unnecessary historical fn compact_txhashset(&self) -> Result<(), Error> {
/// information. We introduce a chain depth called horizon, which is
/// typically in the range of a couple days. Before that horizon, this
/// method will:
///
/// * compact the MMRs data files and flushing the corresponding remove logs
/// * delete old records from the k/v store (older blocks, indexes, etc.)
///
/// This operation can be resource intensive and takes some time to execute.
/// Meanwhile, the chain will not be able to accept new blocks. It should
/// therefore be called judiciously.
pub fn compact(&self) -> Result<(), Error> {
if self.archive_mode {
debug!("Blockchain compaction disabled, node running in archive mode.");
return Ok(());
}
debug!("Starting blockchain compaction."); debug!("Starting blockchain compaction.");
// Compact the txhashset via the extension.
{ {
let mut txhashset = self.txhashset.write(); let mut txhashset = self.txhashset.write();
txhashset.compact()?; txhashset.compact()?;
// print out useful debug info after compaction
txhashset::extending_readonly(&mut txhashset, |extension| { txhashset::extending_readonly(&mut txhashset, |extension| {
extension.dump_output_pmmr(); extension.dump_output_pmmr();
Ok(()) Ok(())
@ -763,20 +744,33 @@ impl Chain {
// compacting, shouldn't be necessary once all of this is well-oiled // compacting, shouldn't be necessary once all of this is well-oiled
debug!("Validating state after compaction."); debug!("Validating state after compaction.");
self.validate(true)?; self.validate(true)?;
Ok(())
}
/// Cleanup old blocks from the db.
/// Determine the cutoff height from the horizon and the current block height.
/// *Only* runs if we are not in archive mode.
fn compact_blocks_db(&self) -> Result<(), Error> {
if !self.archive_mode {
return Ok(())
}
// we need to be careful here in testing as 20 blocks is not that long
// in wall clock time
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
let head = self.head()?; let head = self.head()?;
if head.height <= horizon { let cutoff = head.height.saturating_sub(horizon);
debug!(
"chain: compact_blocks_db: head height: {}, horizon: {}, cutoff: {}",
head.height,
horizon,
cutoff,
);
if cutoff == 0 {
return Ok(()); return Ok(());
} }
debug!(
"Compaction remove blocks older than {}.",
head.height - horizon
);
let mut count = 0; let mut count = 0;
let batch = self.store.batch()?; let batch = self.store.batch()?;
let mut current = batch.get_header_by_height(head.height - horizon - 1)?; let mut current = batch.get_header_by_height(head.height - horizon - 1)?;
@ -806,7 +800,22 @@ impl Chain {
} }
} }
batch.commit()?; batch.commit()?;
debug!("Compaction removed {} blocks, done.", count); debug!("chain: compact_blocks_db: removed {} blocks.", count);
Ok(())
}
/// Triggers chain compaction.
///
/// * compacts the txhashset based on current prune_list
/// * removes historical blocks and associated data from the db (unless archive mode)
///
pub fn compact(&self) -> Result<(), Error> {
self.compact_txhashset()?;
if !self.archive_mode {
self.compact_blocks_db()?;
}
Ok(()) Ok(())
} }

View file

@ -213,7 +213,6 @@ fn comments() -> HashMap<String, String> {
#peer_min_preferred_count = 8 #peer_min_preferred_count = 8
# 7 = Bit flags for FULL_NODE # 7 = Bit flags for FULL_NODE
# 6 = Bit flags for FAST_SYNC_NODE
#This structure needs to be changed internally, to make it more configurable #This structure needs to be changed internally, to make it more configurable
".to_string(), ".to_string(),
); );

View file

@ -233,8 +233,7 @@ impl GlobalConfig {
file.read_to_string(&mut contents)?; file.read_to_string(&mut contents)?;
let decoded: Result<ConfigMembers, toml::de::Error> = toml::from_str(&contents); let decoded: Result<ConfigMembers, toml::de::Error> = toml::from_str(&contents);
match decoded { match decoded {
Ok(mut gc) => { Ok(gc) => {
gc.server.validation_check();
self.members = Some(gc); self.members = Some(gc);
return Ok(self); return Ok(self);
} }

View file

@ -79,9 +79,11 @@ pub const PEER_EXPIRATION_REMOVE_TIME: i64 = PEER_EXPIRATION_DAYS * 24 * 3600;
/// 1_000 times natural scale factor for cuckatoo29 /// 1_000 times natural scale factor for cuckatoo29
pub const TESTNET4_INITIAL_DIFFICULTY: u64 = 1_000 * (2 << (29 - 24)) * 29; pub const TESTNET4_INITIAL_DIFFICULTY: u64 = 1_000 * (2 << (29 - 24)) * 29;
/// Trigger compaction check on average every day for FAST_SYNC_NODE, /// Trigger compaction check on average every day for all nodes.
/// roll the dice on every block to decide, /// Randomized per node - roll the dice on every block to decide.
/// all blocks lower than (BodyHead.height - CUT_THROUGH_HORIZON) will be removed. /// Will compact the txhashset to remove pruned data.
/// Will also remove old blocks and associated data from the database.
/// For a node configured as "archival_mode = true" only the txhashset will be compacted.
pub const COMPACTION_CHECK: u64 = DAY_HEIGHT; pub const COMPACTION_CHECK: u64 = DAY_HEIGHT;
/// Types of chain a server can run with, dictates the genesis block and /// Types of chain a server can run with, dictates the genesis block and

View file

@ -168,37 +168,11 @@ impl Peers {
max_peers max_peers
} }
// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do and are also full archival nodes.
pub fn more_work_archival_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
}
let total_difficulty = self.total_difficulty();
let mut max_peers = peers
.into_iter()
.filter(|x| {
x.info.total_difficulty() > total_difficulty
&& x.info.capabilities.contains(Capabilities::FULL_HIST)
}).collect::<Vec<_>>();
thread_rng().shuffle(&mut max_peers);
max_peers
}
/// Returns single random peer with more work than us. /// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<Peer>> { pub fn more_work_peer(&self) -> Option<Arc<Peer>> {
self.more_work_peers().pop() self.more_work_peers().pop()
} }
/// Returns single random archival peer with more work than us.
pub fn more_work_archival_peer(&self) -> Option<Arc<Peer>> {
self.more_work_archival_peers().pop()
}
/// Return vec of connected peers that currently have the most worked /// Return vec of connected peers that currently have the most worked
/// branch, showing the highest total difficulty. /// branch, showing the highest total difficulty.
pub fn most_work_peers(&self) -> Vec<Arc<Peer>> { pub fn most_work_peers(&self) -> Vec<Arc<Peer>> {

View file

@ -49,32 +49,12 @@ impl Server {
/// Creates a new idle p2p server with no peers /// Creates a new idle p2p server with no peers
pub fn new( pub fn new(
db_env: Arc<lmdb::Environment>, db_env: Arc<lmdb::Environment>,
mut capab: Capabilities, capab: Capabilities,
config: P2PConfig, config: P2PConfig,
adapter: Arc<ChainAdapter>, adapter: Arc<ChainAdapter>,
genesis: Hash, genesis: Hash,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
_archive_mode: bool,
block_1_hash: Option<Hash>,
) -> Result<Server, Error> { ) -> Result<Server, Error> {
// In the case of an archive node, check that we do have the first block.
// In case of first sync we do not perform this check.
if capab.contains(Capabilities::FULL_HIST) && adapter.total_height() > 0 {
// Check that we have block 1
match block_1_hash {
Some(hash) => match adapter.get_block(hash) {
Some(_) => debug!("Full block 1 found, archive capabilities confirmed"),
None => {
debug!("Full block 1 not found, archive capabilities disabled");
capab.remove(Capabilities::FULL_HIST);
}
},
None => {
debug!("Block 1 not found, archive capabilities disabled");
capab.remove(Capabilities::FULL_HIST);
}
}
}
Ok(Server { Ok(Server {
config: config.clone(), config: config.clone(),
capabilities: capab, capabilities: capab,

View file

@ -132,7 +132,7 @@ impl Default for P2PConfig {
P2PConfig { P2PConfig {
host: ipaddr, host: ipaddr,
port: 13414, port: 13414,
capabilities: Capabilities::FAST_SYNC_NODE, capabilities: Capabilities::FULL_NODE,
seeding_type: Seeding::default(), seeding_type: Seeding::default(),
seeds: None, seeds: None,
peers_allow: None, peers_allow: None,
@ -193,26 +193,27 @@ impl Default for Seeding {
} }
bitflags! { bitflags! {
/// Options for what type of interaction a peer supports /// Options for what type of interaction a peer supports
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Capabilities: u32 { pub struct Capabilities: u32 {
/// We don't know (yet) what the peer can do. /// We don't know (yet) what the peer can do.
const UNKNOWN = 0b00000000; const UNKNOWN = 0b00000000;
/// Full archival node, has the whole history without any pruning. /// Can provide full history of headers back to genesis
const FULL_HIST = 0b00000001; /// (for at least one arbitrary fork).
/// Can provide block headers and the TxHashSet for some recent-enough const HEADER_HIST = 0b00000001;
/// height. /// Can provide block headers and the TxHashSet for some recent-enough
const TXHASHSET_HIST = 0b00000010; /// height.
/// Can provide a list of healthy peers const TXHASHSET_HIST = 0b00000010;
const PEER_LIST = 0b00000100; /// Can provide a list of healthy peers
const PEER_LIST = 0b00000100;
const FAST_SYNC_NODE = Capabilities::TXHASHSET_HIST.bits /// All nodes right now are "full nodes".
| Capabilities::PEER_LIST.bits; /// Some nodes internally may maintain longer block histories (archival_mode)
/// but we do not advertise this to other nodes.
const FULL_NODE = Capabilities::FULL_HIST.bits const FULL_NODE = Capabilities::HEADER_HIST.bits
| Capabilities::TXHASHSET_HIST.bits | Capabilities::TXHASHSET_HIST.bits
| Capabilities::PEER_LIST.bits; | Capabilities::PEER_LIST.bits;
} }
} }
/// Types of connection /// Types of connection

View file

@ -58,8 +58,6 @@ fn peer_handshake() {
net_adapter.clone(), net_adapter.clone(),
Hash::from_vec(&vec![]), Hash::from_vec(&vec![]),
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
false,
None,
).unwrap(), ).unwrap(),
); );

View file

@ -433,14 +433,11 @@ impl NetToChainAdapter {
// pushing the new block through the chain pipeline // pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block // remembering to reset the head if we have a bad block
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool { fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool {
if !self.archive_mode { // We cannot process blocks earlier than the horizon so check for this here.
{
let head = self.chain().head().unwrap(); let head = self.chain().head().unwrap();
// we have a fast sync'd node and are sent a block older than our horizon, let horizon = head.height.saturating_sub(global::cut_through_horizon() as u64);
// only sync can do something with that if b.header.height < horizon {
if b.header.height < head
.height
.saturating_sub(global::cut_through_horizon() as u64)
{
return true; return true;
} }
} }

View file

@ -160,28 +160,6 @@ pub struct ServerConfig {
pub stratum_mining_config: Option<StratumServerConfig>, pub stratum_mining_config: Option<StratumServerConfig>,
} }
impl ServerConfig {
/// Configuration items validation check
pub fn validation_check(&mut self) {
// check [server.p2p_config.capabilities] with 'archive_mode' in [server]
if let Some(archive) = self.archive_mode {
// note: slog not available before config loaded, only print here.
if archive != self
.p2p_config
.capabilities
.contains(p2p::Capabilities::FULL_HIST)
{
// if conflict, 'archive_mode' win
self.p2p_config
.capabilities
.toggle(p2p::Capabilities::FULL_HIST);
}
}
// todo: other checks if needed
}
}
impl Default for ServerConfig { impl Default for ServerConfig {
fn default() -> ServerConfig { fn default() -> ServerConfig {
ServerConfig { ServerConfig {

View file

@ -238,7 +238,7 @@ fn connect_to_seeds_and_preferred_peers(
peers_preferred_list: Option<Vec<SocketAddr>>, peers_preferred_list: Option<Vec<SocketAddr>>,
) { ) {
// check if we have some peers in db // check if we have some peers in db
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::FULL_HIST, 100); let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::FULL_NODE, 100);
// if so, get their addresses, otherwise use our seeds // if so, get their addresses, otherwise use our seeds
let mut peer_addrs = if peers.len() > 3 { let mut peer_addrs = if peers.len() > 3 {

View file

@ -29,7 +29,6 @@ use common::adapters::{
}; };
use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats}; use common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats};
use common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus}; use common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus};
use core::core::hash::Hashed;
use core::core::verifier_cache::{LruVerifierCache, VerifierCache}; use core::core::verifier_cache::{LruVerifierCache, VerifierCache};
use core::{consensus, genesis, global, pow}; use core::{consensus, genesis, global, pow};
use grin::{dandelion_monitor, seed, sync}; use grin::{dandelion_monitor, seed, sync};
@ -111,18 +110,6 @@ impl Server {
Some(b) => b, Some(b) => b,
}; };
// If archive mode is enabled then the flags should contains the FULL_HIST flag
if archive_mode && !config
.p2p_config
.capabilities
.contains(p2p::Capabilities::FULL_HIST)
{
config
.p2p_config
.capabilities
.insert(p2p::Capabilities::FULL_HIST);
}
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
// Shared cache for verification results. // Shared cache for verification results.
@ -179,11 +166,6 @@ impl Server {
config.clone(), config.clone(),
)); ));
let block_1_hash = match shared_chain.get_header_by_height(1) {
Ok(header) => Some(header.hash()),
Err(_) => None,
};
let peer_db_env = Arc::new(store::new_named_env(config.db_root.clone(), "peer".into())); let peer_db_env = Arc::new(store::new_named_env(config.db_root.clone(), "peer".into()));
let p2p_server = Arc::new(p2p::Server::new( let p2p_server = Arc::new(p2p::Server::new(
peer_db_env, peer_db_env,
@ -192,8 +174,6 @@ impl Server {
net_adapter.clone(), net_adapter.clone(),
genesis.hash(), genesis.hash(),
stop.clone(), stop.clone(),
archive_mode,
block_1_hash,
)?); )?);
chain_adapter.init(p2p_server.peers.clone()); chain_adapter.init(p2p_server.peers.clone());
pool_net_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone());
@ -236,7 +216,6 @@ impl Server {
sync_state.clone(), sync_state.clone(),
p2p_server.peers.clone(), p2p_server.peers.clone(),
shared_chain.clone(), shared_chain.clone(),
archive_mode,
stop.clone(), stop.clone(),
); );

View file

@ -86,9 +86,9 @@ impl BodySync {
fn body_sync(&mut self) { fn body_sync(&mut self) {
let horizon = global::cut_through_horizon() as u64; let horizon = global::cut_through_horizon() as u64;
let body_head: chain::Tip = self.chain.head().unwrap(); let body_head = self.chain.head().unwrap();
let header_head: chain::Tip = self.chain.header_head().unwrap(); let header_head = self.chain.header_head().unwrap();
let sync_head: chain::Tip = self.chain.get_sync_head().unwrap(); let sync_head = self.chain.get_sync_head().unwrap();
self.reset(); self.reset();
@ -121,15 +121,16 @@ impl BodySync {
} }
hashes.reverse(); hashes.reverse();
if oldest_height < header_head.height.saturating_sub(horizon) {
debug!("body_sync: cannot sync full blocks earlier than horizon.");
return;
}
let peers = self.peers.more_work_peers();
// if we have 5 peers to sync from then ask for 50 blocks total (peer_count * // if we have 5 peers to sync from then ask for 50 blocks total (peer_count *
// 10) max will be 80 if all 8 peers are advertising more work // 10) max will be 80 if all 8 peers are advertising more work
// also if the chain is already saturated with orphans, throttle // also if the chain is already saturated with orphans, throttle
let peers = if oldest_height < header_head.height.saturating_sub(horizon) {
self.peers.more_work_archival_peers()
} else {
self.peers.more_work_peers()
};
let block_count = cmp::min( let block_count = cmp::min(
cmp::min(100, peers.len() * p2p::SEND_CHANNEL_CAP), cmp::min(100, peers.len() * p2p::SEND_CHANNEL_CAP),
chain::MAX_ORPHAN_SIZE.saturating_sub(self.chain.orphans_len()) + 1, chain::MAX_ORPHAN_SIZE.saturating_sub(self.chain.orphans_len()) + 1,

View file

@ -32,7 +32,6 @@ pub struct StateSync {
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
archive_mode: bool,
prev_fast_sync: Option<DateTime<Utc>>, prev_fast_sync: Option<DateTime<Utc>>,
fast_sync_peer: Option<Arc<Peer>>, fast_sync_peer: Option<Arc<Peer>>,
@ -43,13 +42,11 @@ impl StateSync {
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
archive_mode: bool,
) -> StateSync { ) -> StateSync {
StateSync { StateSync {
sync_state, sync_state,
peers, peers,
chain, chain,
archive_mode,
prev_fast_sync: None, prev_fast_sync: None,
fast_sync_peer: None, fast_sync_peer: None,
} }
@ -64,8 +61,8 @@ impl StateSync {
head: &chain::Tip, head: &chain::Tip,
highest_height: u64, highest_height: u64,
) -> bool { ) -> bool {
let need_state_sync = !self.archive_mode let need_state_sync =
&& highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64; highest_height.saturating_sub(head.height) > global::cut_through_horizon() as u64;
if !need_state_sync { if !need_state_sync {
return false; return false;
} }

View file

@ -29,13 +29,12 @@ pub fn run_sync(
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
archive_mode: bool,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
) { ) {
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name("sync".to_string()) .name("sync".to_string())
.spawn(move || { .spawn(move || {
let runner = SyncRunner::new(sync_state, peers, chain, archive_mode, stop); let runner = SyncRunner::new(sync_state, peers, chain, stop);
runner.sync_loop(); runner.sync_loop();
}); });
} }
@ -44,7 +43,6 @@ pub struct SyncRunner {
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
archive_mode: bool,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
} }
@ -53,14 +51,12 @@ impl SyncRunner {
sync_state: Arc<SyncState>, sync_state: Arc<SyncState>,
peers: Arc<p2p::Peers>, peers: Arc<p2p::Peers>,
chain: Arc<chain::Chain>, chain: Arc<chain::Chain>,
archive_mode: bool,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
) -> SyncRunner { ) -> SyncRunner {
SyncRunner { SyncRunner {
sync_state, sync_state,
peers, peers,
chain, chain,
archive_mode,
stop, stop,
} }
} }
@ -118,7 +114,6 @@ impl SyncRunner {
self.sync_state.clone(), self.sync_state.clone(),
self.peers.clone(), self.peers.clone(),
self.chain.clone(), self.chain.clone(),
self.archive_mode,
); );
// Highest height seen on the network, generally useful for a fast test on // Highest height seen on the network, generally useful for a fast test on