grin/servers/src/common/adapters.rs

720 lines
20 KiB
Rust
Raw Normal View History

// Copyright 2018 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Adapters connecting new block, new transaction, and accepted transaction
//! events to consumers of those events.
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
use std::fs::File;
use std::net::SocketAddr;
use std::ops::Deref;
2018-03-04 03:19:54 +03:00
use std::sync::{Arc, RwLock, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Instant;
use rand;
use rand::Rng;
use chain::{self, ChainAdapter, Options, Tip};
hash (features|commitment) in output mmr (#615) * experiment with lock_heights on outputs * playing around with lock_height as part of the switch commitment hash * cleanup * include features in the switch commit hash key * commit * rebase off master * commit * cleanup * missing docs * rework coinbase maturity test to build valid tx * pool and chain tests passing (inputs have switch commitments) * commit * cleanup * check inputs spending coinbase outputs have valid lock_heights * wip - got it building (tests still failing) * use zero key for non coinbase switch commit hash * fees and height wrong order... * send output lock_height over to wallet via api * no more header by height index workaround this for wallet refresh and wallet restore * refresh heights for unspent wallet outputs where missing * TODO - might be slow? * simplify - do not pass around lock_height for non coinbase outputs * commit * fix tests after merge * build input vs coinbase_input switch commit hash key encodes lock_height cleanup output by commit index (currently broken...) * is_unspent and get_unspent cleanup - we have no outputs, only switch_commit_hashes * separate concept of utxo vs output in the api utxos come from the sumtrees (and only the sumtrees, limited info) outputs come from blocks (and we need to look them up via block height) * cleanup * better api support for block outputs with range proofs * basic wallet operations appear to work restore is not working fully refresh refreshes heights correctly (at least appears to) * wallet refresh and wallet restore appear to be working now * fix core tests * fix some mine_simple_chain tests * fixup chain tests * rework so pool tests pass * wallet restore now safely habndles duplicate commitments (reused wallet keys) for coinbase outputs where lock_height is _very_ important * wip * validate_coinbase_maturity got things building tests are failing * lite vs full versions of is_unspent * builds and working locally zero-conf - what to do here? * handle zero-conf edge case (use latest block) * introduce OutputIdentifier, avoid leaking SumCommit everywhere * fix the bad merge * pool verifies coinbase maturity via is_matured this uses sumtree in a consistent way * cleanup * add docs, cleanup build warnings * fix core tests * fix chain tests * fix pool tests * cleanup debug logging that we no longer need * make out_block optional on an input (only care about it for spending coinbase outputs) * cleanup * bump the build
2018-01-17 06:03:40 +03:00
use core::core;
use core::core::block::BlockHeader;
use core::core::hash::{Hash, Hashed};
use core::core::target::Difficulty;
hash (features|commitment) in output mmr (#615) * experiment with lock_heights on outputs * playing around with lock_height as part of the switch commitment hash * cleanup * include features in the switch commit hash key * commit * rebase off master * commit * cleanup * missing docs * rework coinbase maturity test to build valid tx * pool and chain tests passing (inputs have switch commitments) * commit * cleanup * check inputs spending coinbase outputs have valid lock_heights * wip - got it building (tests still failing) * use zero key for non coinbase switch commit hash * fees and height wrong order... * send output lock_height over to wallet via api * no more header by height index workaround this for wallet refresh and wallet restore * refresh heights for unspent wallet outputs where missing * TODO - might be slow? * simplify - do not pass around lock_height for non coinbase outputs * commit * fix tests after merge * build input vs coinbase_input switch commit hash key encodes lock_height cleanup output by commit index (currently broken...) * is_unspent and get_unspent cleanup - we have no outputs, only switch_commit_hashes * separate concept of utxo vs output in the api utxos come from the sumtrees (and only the sumtrees, limited info) outputs come from blocks (and we need to look them up via block height) * cleanup * better api support for block outputs with range proofs * basic wallet operations appear to work restore is not working fully refresh refreshes heights correctly (at least appears to) * wallet refresh and wallet restore appear to be working now * fix core tests * fix some mine_simple_chain tests * fixup chain tests * rework so pool tests pass * wallet restore now safely habndles duplicate commitments (reused wallet keys) for coinbase outputs where lock_height is _very_ important * wip * validate_coinbase_maturity got things building tests are failing * lite vs full versions of is_unspent * builds and working locally zero-conf - what to do here? * handle zero-conf edge case (use latest block) * introduce OutputIdentifier, avoid leaking SumCommit everywhere * fix the bad merge * pool verifies coinbase maturity via is_matured this uses sumtree in a consistent way * cleanup * add docs, cleanup build warnings * fix core tests * fix chain tests * fix pool tests * cleanup debug logging that we no longer need * make out_block optional on an input (only care about it for spending coinbase outputs) * cleanup * bump the build
2018-01-17 06:03:40 +03:00
use core::core::transaction::{Input, OutputIdentifier};
use p2p;
use pool;
use util::OneTime;
use store;
use common::types::{ChainValidationMode, ServerConfig};
use util::LOGGER;
// All adapters use `Weak` references instead of `Arc` to avoid cycles that
// can never be destroyed. These 2 functions are simple helpers to reduce the
// boilerplate of dealing with `Weak`.
fn w<T>(weak: &Weak<T>) -> Arc<T> {
weak.upgrade().unwrap()
}
fn wo<T>(weak_one: &OneTime<Weak<T>>) -> Arc<T> {
w(weak_one.borrow().deref())
}
/// Implementation of the NetAdapter for the blockchain. Gets notified when new
/// blocks and transactions are received and forwards to the chain and pool
/// implementations.
pub struct NetToChainAdapter {
currently_syncing: Arc<AtomicBool>,
archive_mode: bool,
chain: Weak<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peers: OneTime<Weak<p2p::Peers>>,
config: ServerConfig,
}
impl p2p::ChainAdapter for NetToChainAdapter {
fn total_difficulty(&self) -> Difficulty {
w(&self.chain).total_difficulty()
}
fn total_height(&self) -> u64 {
w(&self.chain).head().unwrap().height
}
fn transaction_received(&self, tx: core::Transaction, stem: bool) {
let source = pool::TxSource {
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
};
debug!(
LOGGER,
"Received tx {} from {}, going to process.",
tx.hash(),
source.identifier,
);
let h = tx.hash();
if !stem && tx.kernels.len() != 1 {
debug!(
LOGGER,
"Received regular multi-kernel transaction will attempt to deaggregate"
);
if let Err(e) = self.tx_pool
.write()
.unwrap()
.deaggregate_and_add_to_memory_pool(source, tx, stem)
{
debug!(LOGGER, "Transaction {} rejected: {:?}", h, e);
}
} else {
if let Err(e) = self.tx_pool
.write()
.unwrap()
.add_to_memory_pool(source, tx, stem)
{
debug!(LOGGER, "Transaction {} rejected: {:?}", h, e);
}
}
}
fn block_received(&self, b: core::Block, addr: SocketAddr) -> bool {
debug!(
LOGGER,
"Received block {} at {} from {}, going to process.",
b.hash(),
b.header.height,
addr,
);
2018-03-04 03:19:54 +03:00
self.process_block(b, addr)
}
fn compact_block_received(&self, cb: core::CompactBlock, addr: SocketAddr) -> bool {
let bhash = cb.hash();
debug!(
LOGGER,
"Received compact_block {} at {} from {}, going to process.",
bhash,
cb.header.height,
addr,
);
if cb.kern_ids.is_empty() {
let block = core::Block::hydrate_from(cb, vec![]);
// push the freshly hydrated block through the chain pipeline
2018-03-04 03:19:54 +03:00
self.process_block(block, addr)
} else {
// TODO - do we need to validate the header here?
let txs = {
let tx_pool = self.tx_pool.read().unwrap();
tx_pool.retrieve_transactions(&cb)
};
2018-03-04 03:19:54 +03:00
debug!(LOGGER, "adapter: txs from tx pool - {}", txs.len(),);
// TODO - 3 scenarios here -
// 1) we hydrate a valid block (good to go)
// 2) we hydrate an invalid block (txs legit missing from our pool)
// 3) we hydrate an invalid block (peer sent us a "bad" compact block) - [TBD]
let block = core::Block::hydrate_from(cb.clone(), txs);
let chain = self.chain
.upgrade()
.expect("failed to upgrade weak ref to chain");
if let Ok(sums) = chain.get_block_sums(&cb.header.previous) {
if block.validate(&sums.output_sum, &sums.kernel_sum).is_ok() {
debug!(LOGGER, "adapter: successfully hydrated block from tx pool!");
self.process_block(block, addr)
} else {
debug!(
LOGGER,
"adapter: block invalid after hydration, requesting full block"
);
self.request_block(&cb.header, &addr);
true
}
} else {
debug!(
2018-03-04 03:19:54 +03:00
LOGGER,
"adapter: failed to retrieve previous block header (still syncing?)"
2018-03-04 03:19:54 +03:00
);
true
}
}
}
fn header_received(&self, bh: core::BlockHeader, addr: SocketAddr) -> bool {
let bhash = bh.hash();
debug!(
LOGGER,
2018-03-04 03:19:54 +03:00
"Received block header {} at {} from {}, going to process.", bhash, bh.height, addr,
);
// pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header
let res = w(&self.chain).process_block_header(&bh, self.chain_opts());
if let &Err(ref e) = &res {
debug!(LOGGER, "Block header {} refused by chain: {:?}", bhash, e);
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
if e.is_bad_data() {
2018-03-04 03:19:54 +03:00
debug!(
LOGGER,
"header_received: {} is a bad header, resetting header head", bhash
);
let _ = w(&self.chain).reset_head();
return false;
} else {
// we got an error when trying to process the block header
// but nothing serious enough to need to ban the peer upstream
return true;
}
}
// we have successfully processed a block header
// so we can go request the block itself
self.request_compact_block(&bh, &addr);
// done receiving the header
true
}
fn headers_received(&self, bhs: Vec<core::BlockHeader>, addr: SocketAddr) {
info!(
LOGGER,
"Received block headers {:?} from {}",
bhs.iter().map(|x| x.hash()).collect::<Vec<_>>(),
addr,
);
// try to add each header to our header chain
let mut added_hs = vec![];
for bh in bhs {
let res = w(&self.chain).sync_block_header(&bh, self.chain_opts());
match res {
Ok(_) => {
added_hs.push(bh.hash());
}
Err(chain::Error::Unfit(s)) => {
2017-09-29 21:44:25 +03:00
info!(
LOGGER,
2017-09-29 21:44:25 +03:00
"Received unfit block header {} at {}: {}.",
bh.hash(),
bh.height,
s
);
}
Err(chain::Error::StoreErr(e, explanation)) => {
error!(
LOGGER,
"Store error processing block header {}: in {} {:?}",
bh.hash(),
2017-10-22 13:40:24 +03:00
explanation,
e
);
return;
}
Err(e) => {
info!(LOGGER, "Invalid block header {}: {:?}.", bh.hash(), e);
// TODO penalize peer somehow
}
}
}
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
let header_head = w(&self.chain).get_header_head().unwrap();
info!(
LOGGER,
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
"Added {} headers to the header chain. Last: {} at {}.",
added_hs.len(),
header_head.last_block_h,
header_head.height,
);
}
fn locate_headers(&self, locator: Vec<Hash>) -> Vec<core::BlockHeader> {
2018-03-04 03:19:54 +03:00
debug!(LOGGER, "locate_headers: {:?}", locator,);
let header = match self.find_common_header(locator) {
Some(header) => header,
None => return vec![],
};
2018-03-04 03:19:54 +03:00
debug!(LOGGER, "locate_headers: common header: {:?}", header.hash(),);
// looks like we know one, getting as many following headers as allowed
let hh = header.height;
let mut headers = vec![];
for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) {
let header = w(&self.chain).get_header_by_height(h);
match header {
Ok(head) => headers.push(head),
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => break,
Err(e) => {
error!(LOGGER, "Could not build header locator: {:?}", e);
return vec![];
}
}
}
debug!(
LOGGER,
"locate_headers: returning headers: {}",
headers.len(),
);
headers
}
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block> {
let b = w(&self.chain).get_block(&h);
match b {
Ok(b) => Some(b),
_ => None,
}
}
/// Provides a reading view into the current txhashset state as well as
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
/// the required indexes for a consumer to rewind to a consistant state
/// at the provided block hash.
fn txhashset_read(&self, h: Hash) -> Option<p2p::TxHashSetRead> {
match w(&self.chain).txhashset_read(h.clone()) {
Ok((out_index, kernel_index, read)) => Some(p2p::TxHashSetRead {
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
output_index: out_index,
kernel_index: kernel_index,
reader: read,
}),
Err(e) => {
2018-03-04 03:19:54 +03:00
warn!(
LOGGER,
"Couldn't produce txhashset data for block {}: {:?}", h, e
2018-03-04 03:19:54 +03:00
);
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
None
}
}
}
/// Writes a reading view on a txhashset state that's been provided to us.
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
/// rewound to the provided indexes.
fn txhashset_write(
&self,
h: Hash,
rewind_to_output: u64,
rewind_to_kernel: u64,
txhashset_data: File,
_peer_addr: SocketAddr,
) -> bool {
// TODO check whether we should accept any txhashset now
2018-03-04 03:19:54 +03:00
if let Err(e) =
w(&self.chain).txhashset_write(h, rewind_to_output, rewind_to_kernel, txhashset_data)
2018-03-04 03:19:54 +03:00
{
error!(LOGGER, "Failed to save txhashset archive: {:?}", e);
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
!e.is_bad_data()
} else {
info!(LOGGER, "Received valid txhashset data for {}.", h);
[WIP] Abridged sync (#440) * Util to zip and unzip directories * First pass at sumtree request/response. Add message types, implement the exchange in the protocol, zip up the sumtree directory and stream the file over, with necessary adapter hooks. * Implement the sumtree archive receive logicGets the sumtree archive data stream from the network and write it to a file. Unzip the file, place it at the right spot and reconstruct the sumtree data structure, rewinding where to the right spot. * Sumtree hash structure validation * Simplify sumtree backend buffering logic. The backend for a sumtree has to implement some in-memory buffering logic to provide a commit/rollback interface. The backend itself is an aggregate of 3 underlying storages (an append only file, a remove log and a skip list). The buffering was previously implemented both by the backend and some of the underlying storages. Now pushing back all buffering logic to the storages to keep the backend simpler. * Add kernel append only store file to sumtrees. The chain sumtrees structure now also saves all kernels to a dedicated file. As that storage is implemented by the append only file wrapper, it's also rewind-aware. * Full state validation. Checks that: - MMRs are sane (hash and sum each node) - Tree roots match the corresponding header - Kernel signatures are valid - Sum of all kernel excesses equals the sum of UTXO commitments minus the supply * Fast sync handoff to body sync. Once the fast-sync state is fully setup, get bacj in body sync mode to get the full bodies of the last blocks we're missing. * First fully working fast sync * Facility in p2p conn to deal with attachments (raw binary after message). * Re-introduced sumtree send and receive message handling using the above. * Fixed test and finished updating all required db state after sumtree validation. * Massaged a little bit the pipeline orphan check to still work after the new sumtrees have been setup. * Various cleanup. Consolidated fast sync and full sync into a single function as they're very similar. Proper conditions to trigger a sumtree request and some checks on receiving it.
2018-02-10 01:32:16 +03:00
self.currently_syncing.store(true, Ordering::Relaxed);
true
}
}
}
impl NetToChainAdapter {
/// Construct a new NetToChainAdapter instance
pub fn new(
currently_syncing: Arc<AtomicBool>,
archive_mode: bool,
chain_ref: Weak<chain::Chain>,
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
config: ServerConfig,
) -> NetToChainAdapter {
NetToChainAdapter {
currently_syncing,
archive_mode,
chain: chain_ref,
tx_pool,
peers: OneTime::new(),
config,
}
}
2017-12-03 15:46:00 +03:00
/// Initialize a NetToChainAdaptor with reference to a Peers object.
/// Should only be called once.
pub fn init(&self, peers: Weak<p2p::Peers>) {
self.peers.init(peers);
}
// recursively go back through the locator vector and stop when we find
// a header that we recognize this will be a header shared in common
// between us and the peer
fn find_common_header(&self, locator: Vec<Hash>) -> Option<BlockHeader> {
if locator.len() == 0 {
return None;
}
let chain = w(&self.chain);
let known = chain.get_block_header(&locator[0]);
match known {
Ok(header) => {
// even if we know the block, it may not be on our winning chain
let known_winning = chain.get_header_by_height(header.height);
if let Ok(known_winning) = known_winning {
if known_winning.hash() != header.hash() {
self.find_common_header(locator[1..].to_vec())
} else {
Some(header)
}
} else {
self.find_common_header(locator[1..].to_vec())
}
2018-03-04 03:19:54 +03:00
}
Err(chain::Error::StoreErr(store::Error::NotFoundErr, _)) => {
self.find_common_header(locator[1..].to_vec())
2018-03-04 03:19:54 +03:00
}
Err(e) => {
error!(LOGGER, "Could not build header locator: {:?}", e);
None
}
}
}
// pushing the new block through the chain pipeline
// remembering to reset the head if we have a bad block
2018-03-04 03:19:54 +03:00
fn process_block(&self, b: core::Block, addr: SocketAddr) -> bool {
let prev_hash = b.header.previous;
let bhash = b.hash();
let chain = w(&self.chain);
match chain.process_block(b, self.chain_opts()) {
Ok((tip, _)) => {
self.validate_chain(bhash);
self.check_compact(tip);
true
}
2018-03-04 03:19:54 +03:00
Err(chain::Error::Orphan) => {
// make sure we did not miss the parent block
if !chain.is_orphan(&prev_hash) && !self.currently_syncing.load(Ordering::Relaxed) {
debug!(LOGGER, "adapter: process_block: received an orphan block, checking the parent: {:}", prev_hash);
self.request_block_by_hash(prev_hash, &addr)
2018-03-04 03:19:54 +03:00
}
true
}
Err(ref e) if e.is_bad_data() => {
debug!(
LOGGER,
"adapter: process_block: {} is a bad block, resetting head", bhash
);
let _ = chain.reset_head();
// we potentially changed the state of the system here
// so check everything is still ok
self.validate_chain(bhash);
2018-03-04 03:19:54 +03:00
false
}
Err(e) => {
debug!(
LOGGER,
"adapter: process_block: block {} refused by chain: {:?}", bhash, e
2018-03-04 03:19:54 +03:00
);
true
}
}
}
fn validate_chain(&self, bhash: Hash) {
// If we are running in "validate the full chain every block" then
// panic here if validation fails for any reason.
// We are out of consensus at this point and want to track the problem
// down as soon as possible.
// Skip this if we are currently syncing (too slow).
2018-03-23 01:58:32 +03:00
let chain = w(&self.chain);
if chain.head().unwrap().height > 0 && !self.currently_syncing.load(Ordering::Relaxed)
&& self.config.chain_validation_mode == ChainValidationMode::EveryBlock
{
let now = Instant::now();
debug!(
LOGGER,
"adapter: process_block: ***** validating full chain state at {}", bhash,
);
let chain = w(&self.chain);
chain
.validate(true)
.expect("chain validation failed, hard stop");
debug!(
LOGGER,
"adapter: process_block: ***** done validating full chain state, took {}s",
now.elapsed().as_secs(),
);
2018-03-04 03:19:54 +03:00
}
}
fn check_compact(&self, tip_res: Option<Tip>) {
// no compaction during sync or if we're in historical mode
if self.archive_mode || self.currently_syncing.load(Ordering::Relaxed) {
return;
}
if let Some(tip) = tip_res {
// trigger compaction every 2000 blocks, uses a different thread to avoid
// blocking the caller thread (likely a peer)
if tip.height % 2000 == 0 {
let chain = w(&self.chain);
let _ = thread::Builder::new()
.name("compactor".to_string())
.spawn(move || {
if let Err(e) = chain.compact() {
error!(LOGGER, "Could not compact chain: {:?}", e);
}
});
}
}
}
// After receiving a compact block if we cannot successfully hydrate
// it into a full block then fallback to requesting the full block
// from the same peer that gave us the compact block
// consider additional peers for redundancy?
fn request_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
2018-03-04 03:19:54 +03:00
self.request_block_by_hash(bh.hash(), addr)
}
fn request_block_by_hash(&self, h: Hash, addr: &SocketAddr) {
2018-03-04 03:19:54 +03:00
self.send_block_request_to_peer(h, addr, |peer, h| peer.send_block_request(h))
}
// After we have received a block header in "header first" propagation
// we need to go request the block (compact representation) from the
// same peer that gave us the header (unless we have already accepted the block)
fn request_compact_block(&self, bh: &BlockHeader, addr: &SocketAddr) {
2018-03-04 03:19:54 +03:00
self.send_block_request_to_peer(bh.hash(), addr, |peer, h| {
peer.send_compact_block_request(h)
})
}
2018-03-04 03:19:54 +03:00
fn send_block_request_to_peer<F>(&self, h: Hash, addr: &SocketAddr, f: F)
where
F: Fn(&p2p::Peer, Hash) -> Result<(), p2p::Error>,
{
match w(&self.chain).block_exists(h) {
Ok(false) => {
match wo(&self.peers).get_connected_peer(addr) {
None => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, not connected", addr),
Some(peer) => {
match peer.read() {
Err(e) => debug!(LOGGER, "send_block_request_to_peer: can't send request to peer {:?}, read fails: {:?}", addr, e),
Ok(p) => {
if let Err(e) = f(&p, h) {
error!(LOGGER, "send_block_request_to_peer: failed: {:?}", e)
}
}
}
}
}
}
Ok(true) => debug!(LOGGER, "send_block_request_to_peer: block {} already known", h),
Err(e) => error!(LOGGER, "send_block_request_to_peer: failed to check block exists: {:?}", e)
}
2018-03-04 03:19:54 +03:00
}
/// Prepare options for the chain pipeline
fn chain_opts(&self) -> chain::Options {
let opts = if self.currently_syncing.load(Ordering::Relaxed) {
chain::Options::SYNC
} else {
chain::Options::NONE
};
opts
}
}
/// Implementation of the ChainAdapter for the network. Gets notified when the
/// blockchain accepted a new block, asking the pool to update its state and
/// the network to broadcast the block
pub struct ChainToPoolAndNetAdapter {
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
peers: OneTime<Weak<p2p::Peers>>,
}
impl ChainAdapter for ChainToPoolAndNetAdapter {
fn block_accepted(&self, b: &core::Block, opts: Options) {
debug!(LOGGER, "adapter: block_accepted: {:?}", b.hash());
if let Err(e) = self.tx_pool.write().unwrap().reconcile_block(b) {
error!(
LOGGER,
"Pool could not update itself at block {}: {:?}",
b.hash(),
e,
);
}
// If we mined the block then we want to broadcast the block itself.
// If block is empty then broadcast the block.
// If block contains txs then broadcast the compact block.
// If we received the block from another node then broadcast "header first"
// to minimize network traffic.
if opts.contains(Options::MINE) {
// propagate compact block out if we mined the block
// but broadcast full block if we have no txs
let cb = b.as_compact_block();
if cb.kern_ids.is_empty() {
// in the interest of testing all code paths
// randomly decide how we send an empty block out
// TODO - lock this down once we are comfortable it works...
let mut rng = rand::thread_rng();
if rng.gen() {
wo(&self.peers).broadcast_block(&b);
} else {
wo(&self.peers).broadcast_compact_block(&cb);
}
} else {
wo(&self.peers).broadcast_compact_block(&cb);
}
} else {
// "header first" propagation if we are not the originator of this block
// again randomly chose between "header first" or "compact block" propagation
// to ensure we test a wide variety of code paths
let mut rng = rand::thread_rng();
if rng.gen() {
wo(&self.peers).broadcast_header(&b.header);
} else {
let cb = b.as_compact_block();
wo(&self.peers).broadcast_compact_block(&cb);
}
}
}
}
impl ChainToPoolAndNetAdapter {
/// Construct a ChainToPoolAndNetAdaper instance.
pub fn new(
tx_pool: Arc<RwLock<pool::TransactionPool<PoolToChainAdapter>>>,
) -> ChainToPoolAndNetAdapter {
ChainToPoolAndNetAdapter {
tx_pool: tx_pool,
peers: OneTime::new(),
}
}
/// Initialize a ChainToPoolAndNetAdapter instance with hanlde to a Peers object.
/// Should only be called once.
pub fn init(&self, peers: Weak<p2p::Peers>) {
self.peers.init(peers);
}
}
/// Adapter between the transaction pool and the network, to relay
/// transactions that have been accepted.
pub struct PoolToNetAdapter {
peers: OneTime<Weak<p2p::Peers>>,
}
impl pool::PoolAdapter for PoolToNetAdapter {
fn stem_tx_accepted(&self, tx: &core::Transaction) {
wo(&self.peers).broadcast_stem_transaction(tx);
}
fn tx_accepted(&self, tx: &core::Transaction) {
wo(&self.peers).broadcast_transaction(tx);
}
}
impl PoolToNetAdapter {
/// Create a new pool to net adapter
pub fn new() -> PoolToNetAdapter {
PoolToNetAdapter {
peers: OneTime::new(),
}
}
/// Setup the p2p server on the adapter
pub fn init(&self, peers: Weak<p2p::Peers>) {
self.peers.init(peers);
}
}
/// Implements the view of the blockchain required by the TransactionPool to
/// operate. Mostly needed to break any direct lifecycle or implementation
/// dependency between the pool and the chain.
#[derive(Clone)]
pub struct PoolToChainAdapter {
chain: OneTime<Weak<chain::Chain>>,
}
impl PoolToChainAdapter {
/// Create a new pool adapter
pub fn new() -> PoolToChainAdapter {
PoolToChainAdapter {
chain: OneTime::new(),
}
}
/// Set the pool adapter's chain. Should only be called once.
pub fn set_chain(&self, chain_ref: Weak<chain::Chain>) {
self.chain.init(chain_ref);
}
}
impl pool::BlockChain for PoolToChainAdapter {
fn is_unspent(&self, output_ref: &OutputIdentifier) -> Result<Hash, pool::PoolError> {
2018-03-04 03:19:54 +03:00
wo(&self.chain).is_unspent(output_ref).map_err(|e| match e {
chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound,
chain::types::Error::OutputSpent => pool::PoolError::OutputSpent,
_ => pool::PoolError::GenericPoolError,
})
}
hash (features|commitment) in output mmr (#615) * experiment with lock_heights on outputs * playing around with lock_height as part of the switch commitment hash * cleanup * include features in the switch commit hash key * commit * rebase off master * commit * cleanup * missing docs * rework coinbase maturity test to build valid tx * pool and chain tests passing (inputs have switch commitments) * commit * cleanup * check inputs spending coinbase outputs have valid lock_heights * wip - got it building (tests still failing) * use zero key for non coinbase switch commit hash * fees and height wrong order... * send output lock_height over to wallet via api * no more header by height index workaround this for wallet refresh and wallet restore * refresh heights for unspent wallet outputs where missing * TODO - might be slow? * simplify - do not pass around lock_height for non coinbase outputs * commit * fix tests after merge * build input vs coinbase_input switch commit hash key encodes lock_height cleanup output by commit index (currently broken...) * is_unspent and get_unspent cleanup - we have no outputs, only switch_commit_hashes * separate concept of utxo vs output in the api utxos come from the sumtrees (and only the sumtrees, limited info) outputs come from blocks (and we need to look them up via block height) * cleanup * better api support for block outputs with range proofs * basic wallet operations appear to work restore is not working fully refresh refreshes heights correctly (at least appears to) * wallet refresh and wallet restore appear to be working now * fix core tests * fix some mine_simple_chain tests * fixup chain tests * rework so pool tests pass * wallet restore now safely habndles duplicate commitments (reused wallet keys) for coinbase outputs where lock_height is _very_ important * wip * validate_coinbase_maturity got things building tests are failing * lite vs full versions of is_unspent * builds and working locally zero-conf - what to do here? * handle zero-conf edge case (use latest block) * introduce OutputIdentifier, avoid leaking SumCommit everywhere * fix the bad merge * pool verifies coinbase maturity via is_matured this uses sumtree in a consistent way * cleanup * add docs, cleanup build warnings * fix core tests * fix chain tests * fix pool tests * cleanup debug logging that we no longer need * make out_block optional on an input (only care about it for spending coinbase outputs) * cleanup * bump the build
2018-01-17 06:03:40 +03:00
fn is_matured(&self, input: &Input, height: u64) -> Result<(), pool::PoolError> {
wo(&self.chain)
hash (features|commitment) in output mmr (#615) * experiment with lock_heights on outputs * playing around with lock_height as part of the switch commitment hash * cleanup * include features in the switch commit hash key * commit * rebase off master * commit * cleanup * missing docs * rework coinbase maturity test to build valid tx * pool and chain tests passing (inputs have switch commitments) * commit * cleanup * check inputs spending coinbase outputs have valid lock_heights * wip - got it building (tests still failing) * use zero key for non coinbase switch commit hash * fees and height wrong order... * send output lock_height over to wallet via api * no more header by height index workaround this for wallet refresh and wallet restore * refresh heights for unspent wallet outputs where missing * TODO - might be slow? * simplify - do not pass around lock_height for non coinbase outputs * commit * fix tests after merge * build input vs coinbase_input switch commit hash key encodes lock_height cleanup output by commit index (currently broken...) * is_unspent and get_unspent cleanup - we have no outputs, only switch_commit_hashes * separate concept of utxo vs output in the api utxos come from the sumtrees (and only the sumtrees, limited info) outputs come from blocks (and we need to look them up via block height) * cleanup * better api support for block outputs with range proofs * basic wallet operations appear to work restore is not working fully refresh refreshes heights correctly (at least appears to) * wallet refresh and wallet restore appear to be working now * fix core tests * fix some mine_simple_chain tests * fixup chain tests * rework so pool tests pass * wallet restore now safely habndles duplicate commitments (reused wallet keys) for coinbase outputs where lock_height is _very_ important * wip * validate_coinbase_maturity got things building tests are failing * lite vs full versions of is_unspent * builds and working locally zero-conf - what to do here? * handle zero-conf edge case (use latest block) * introduce OutputIdentifier, avoid leaking SumCommit everywhere * fix the bad merge * pool verifies coinbase maturity via is_matured this uses sumtree in a consistent way * cleanup * add docs, cleanup build warnings * fix core tests * fix chain tests * fix pool tests * cleanup debug logging that we no longer need * make out_block optional on an input (only care about it for spending coinbase outputs) * cleanup * bump the build
2018-01-17 06:03:40 +03:00
.is_matured(input, height)
.map_err(|e| match e {
chain::types::Error::OutputNotFound => pool::PoolError::OutputNotFound,
_ => pool::PoolError::GenericPoolError,
})
2018-03-04 03:19:54 +03:00
}
fn head_header(&self) -> Result<BlockHeader, pool::PoolError> {
wo(&self.chain)
.head_header()
.map_err(|_| pool::PoolError::GenericPoolError)
}
}