initial support for kernel data download ()

* initial support for kernel data download

* fix vec backend for tests

* cleanup after rebase
This commit is contained in:
Antioch Peverell 2019-05-14 17:17:38 +01:00 committed by GitHub
parent ff1c55193f
commit e56cd55980
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 280 additions and 10 deletions

25
Cargo.lock generated
View file

@ -786,6 +786,7 @@ dependencies = [
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -869,6 +870,7 @@ dependencies = [
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1784,6 +1786,14 @@ dependencies = [
"ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "remove_dir_all"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ring"
version = "0.13.5"
@ -2025,6 +2035,19 @@ dependencies = [
"unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tempfile"
version = "3.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)",
"remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "term"
version = "0.5.2"
@ -2749,6 +2772,7 @@ dependencies = [
"checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828"
"checksum regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "559008764a17de49a3146b234641644ed37d118d1ef641a0bb573d146edc6ce0"
"checksum regex-syntax 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "dcfd8681eebe297b81d98498869d4aae052137651ad7b96822f09ceb690d0a96"
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
"checksum ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2c4db68a2e35f3497146b7e4563df7d4773a2433230c5e4b448328e31740458a"
"checksum ripemd160 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "482aa56cc68aaeccdaaff1cc5a72c247da8bbad3beb174ca5741f274c22883fb"
"checksum rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ccc78bfd5acd7bf3e89cffcf899e5cb1a52d6fafa8dec2739ad70c9577a57288"
@ -2781,6 +2805,7 @@ dependencies = [
"checksum syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)" = "261ae9ecaa397c42b960649561949d69311f08eeaea86a65696e6e46517cf741"
"checksum syn 0.15.31 (registry+https://github.com/rust-lang/crates.io-index)" = "d2b4cfac95805274c6afdb12d8f770fa2d27c045953e7b630a81801953699a9a"
"checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015"
"checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a"
"checksum term 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "edd106a334b7657c10b7c540a0106114feadeb4dc314513e97df481d5d966f42"
"checksum term_size 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9e5b9a66db815dcfd2da92db471106457082577c3c278d4138ab3e3b4e189327"
"checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f"

View file

@ -33,6 +33,7 @@ use self::pool_api::PoolInfoHandler;
use self::pool_api::PoolPushHandler;
use self::server_api::IndexHandler;
use self::server_api::StatusHandler;
use self::server_api::KernelDownloadHandler;
use self::transactions_api::TxHashSetHandler;
use crate::auth::{BasicAuthMiddleware, GRIN_BASIC_REALM};
use crate::chain;
@ -135,6 +136,9 @@ pub fn build_router(
chain: Arc::downgrade(&chain),
peers: Arc::downgrade(&peers),
};
let kernel_download_handler = KernelDownloadHandler {
peers: Arc::downgrade(&peers),
};
let txhashset_handler = TxHashSetHandler {
chain: Arc::downgrade(&chain),
};
@ -165,6 +169,7 @@ pub fn build_router(
router.add_route("/v1/chain/validate", Arc::new(chain_validation_handler))?;
router.add_route("/v1/txhashset/*", Arc::new(txhashset_handler))?;
router.add_route("/v1/status", Arc::new(status_handler))?;
router.add_route("/v1/kerneldownload", Arc::new(kernel_download_handler))?;
router.add_route("/v1/pool", Arc::new(pool_info_handler))?;
router.add_route("/v1/pool/push", Arc::new(pool_push_handler))?;
router.add_route("/v1/peers/all", Arc::new(peers_all_handler))?;

View file

@ -19,7 +19,7 @@ use crate::rest::*;
use crate::router::{Handler, ResponseFuture};
use crate::types::*;
use crate::web::*;
use hyper::{Body, Request};
use hyper::{Body, Request, StatusCode};
use std::sync::Weak;
// RESTful index of available api endpoints
@ -36,6 +36,29 @@ impl Handler for IndexHandler {
}
}
pub struct KernelDownloadHandler {
pub peers: Weak<p2p::Peers>,
}
impl Handler for KernelDownloadHandler {
fn post(&self, _req: Request<Body>) -> ResponseFuture {
if let Some(peer) = w_fut!(&self.peers).most_work_peer() {
match peer.send_kernel_data_request() {
Ok(_) => response(StatusCode::OK, "{}"),
Err(e) => response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("requesting kernel data from peer failed: {:?}", e),
),
}
} else {
response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("requesting kernel data from peer failed (no peers)"),
)
}
}
}
/// Status handler. Post a summary of the server status
/// GET /v1/status
pub struct StatusHandler {

View file

@ -23,6 +23,7 @@ use crate::core::core::{
};
use crate::core::global;
use crate::core::pow;
use crate::core::ser::{Readable, StreamingReader};
use crate::error::{Error, ErrorKind};
use crate::pipe;
use crate::store;
@ -36,6 +37,7 @@ use crate::util::{Mutex, RwLock, StopState};
use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::Read;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@ -659,6 +661,28 @@ impl Chain {
self.txhashset.read().roots()
}
/// Provides a reading view into the current kernel state.
pub fn kernel_data_read(&self) -> Result<File, Error> {
let txhashset = self.txhashset.read();
txhashset::rewindable_kernel_view(&txhashset, |view| view.kernel_data_read())
}
/// Writes kernels provided to us (via a kernel data download).
/// Currently does not write these to disk and simply deserializes
/// the provided data.
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
let mut count = 0;
let mut stream = StreamingReader::new(reader, Duration::from_secs(1));
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
count += 1;
}
debug!("kernel_data_write: read {} kernels", count);
Ok(())
}
/// Provides a reading view into the current txhashset state as well as
/// the required indexes for a consumer to rewind to a consistent state
/// at the provided block hash.

View file

@ -14,6 +14,8 @@
//! Lightweight readonly view into kernel MMR for convenience.
use std::fs::File;
use crate::core::core::pmmr::RewindablePMMR;
use crate::core::core::{BlockHeader, TxKernel};
use crate::error::{Error, ErrorKind};
@ -78,4 +80,14 @@ impl<'a> RewindableKernelView<'a> {
}
Ok(())
}
/// Read the "raw" kernel backend data file (via temp file for consistent view on data).
pub fn kernel_data_read(&self) -> Result<File, Error> {
let file = self
.pmmr
.backend()
.data_as_temp_file()
.map_err(|_| ErrorKind::FileReadErr("Data file woes".into()))?;
Ok(file)
}
}

View file

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use croaring::Bitmap;
use crate::core::hash::Hash;
@ -59,6 +61,11 @@ pub trait Backend<T: PMMRable> {
/// triggered removal).
fn remove(&mut self, position: u64) -> Result<(), String>;
/// Creates a temp file containing the contents of the underlying data file
/// from the backend storage. This allows a caller to see a consistent view
/// of the data without needing to lock the backend storage.
fn data_as_temp_file(&self) -> Result<File, String>;
/// Release underlying datafiles and locks
fn release_files(&mut self);

View file

@ -49,6 +49,11 @@ where
}
}
/// Reference to the underlying storage backend.
pub fn backend(&'a self) -> &Backend<T> {
self.backend
}
/// Build a new readonly PMMR pre-initialized to
/// last_pos with the provided backend.
pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'_, T, B> {

View file

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fs::File;
use self::core::core::hash::{DefaultHashable, Hash};
use self::core::core::pmmr::{self, Backend};
use self::core::core::BlockHeader;
@ -103,6 +105,10 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
Some(data.as_elmt())
}
fn data_as_temp_file(&self) -> Result<File, String> {
unimplemented!()
}
fn leaf_pos_iter(&self) -> Box<Iterator<Item = u64> + '_> {
unimplemented!()
}

View file

@ -18,6 +18,7 @@ num = "0.1"
rand = "0.6"
serde = "1"
serde_derive = "1"
tempfile = "3.0.5"
log = "0.4"
chrono = { version = "0.4.4", features = ["serde"] }

View file

@ -74,6 +74,8 @@ enum_from_primitive! {
BanReason = 18,
GetTransaction = 19,
TransactionKernel = 20,
KernelDataRequest = 21,
KernelDataResponse = 22,
}
}
@ -111,6 +113,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::BanReason => 64,
Type::GetTransaction => 32,
Type::TransactionKernel => 32,
Type::KernelDataRequest => 0,
Type::KernelDataResponse => 8,
}
}
@ -714,3 +718,30 @@ impl Readable for TxHashSetArchive {
})
}
}
pub struct KernelDataRequest {}
impl Writeable for KernelDataRequest {
fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), ser::Error> {
Ok(())
}
}
pub struct KernelDataResponse {
/// Size in bytes of the attached kernel data file.
pub bytes: u64,
}
impl Writeable for KernelDataResponse {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u64(self.bytes)?;
Ok(())
}
}
impl Readable for KernelDataResponse {
fn read(reader: &mut dyn Reader) -> Result<KernelDataResponse, ser::Error> {
let bytes = reader.read_u64()?;
Ok(KernelDataResponse { bytes })
}
}

View file

@ -15,6 +15,7 @@
use crate::util::{Mutex, RwLock};
use std::fmt;
use std::fs::File;
use std::io::Read;
use std::net::{Shutdown, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
@ -26,7 +27,9 @@ use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type};
use crate::msg::{
self, BanReason, GetPeerAddrs, KernelDataRequest, Locator, Ping, TxHashSetRequest, Type,
};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
@ -379,6 +382,13 @@ impl Peer {
)
}
pub fn send_kernel_data_request(&self) -> Result<(), Error> {
debug!("Asking {} for kernel data.", self.info.addr);
self.connection
.lock()
.send(&KernelDataRequest {}, msg::Type::KernelDataRequest)
}
/// Stops the peer, closing its connection
pub fn stop(&self) {
self.connection.lock().close();
@ -521,6 +531,14 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_block(h)
}
fn kernel_data_read(&self) -> Result<File, chain::Error> {
self.adapter.kernel_data_read()
}
fn kernel_data_write(&self, reader: &mut Read) -> Result<bool, chain::Error> {
self.adapter.kernel_data_write(reader)
}
fn txhashset_read(&self, h: Hash) -> Option<TxHashSetRead> {
self.adapter.txhashset_read(h)
}

View file

@ -15,6 +15,7 @@
use crate::util::RwLock;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
@ -594,6 +595,14 @@ impl ChainAdapter for Peers {
self.adapter.get_block(h)
}
fn kernel_data_read(&self) -> Result<File, chain::Error> {
self.adapter.kernel_data_read()
}
fn kernel_data_write(&self, reader: &mut Read) -> Result<bool, chain::Error> {
self.adapter.kernel_data_write(reader)
}
fn txhashset_read(&self, h: Hash) -> Option<TxHashSetRead> {
self.adapter.txhashset_read(h)
}

View file

@ -15,17 +15,19 @@
use rand::{thread_rng, Rng};
use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Write};
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::sync::Arc;
use chrono::prelude::Utc;
use tempfile::tempfile;
use crate::conn::{Message, MessageHandler, Response};
use crate::core::core::{self, hash::Hash, CompactBlock};
use crate::util::{RateCounter, RwLock};
use chrono::prelude::Utc;
use crate::msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive,
TxHashSetRequest, Type,
BanReason, GetPeerAddrs, Headers, KernelDataResponse, Locator, PeerAddrs, Ping, Pong,
TxHashSetArchive, TxHashSetRequest, Type,
};
use crate::types::{Error, NetAdapter, PeerInfo};
@ -244,6 +246,54 @@ impl MessageHandler for Protocol {
Ok(None)
}
Type::KernelDataRequest => {
debug!("handle_payload: kernel_data_request");
let kernel_data = self.adapter.kernel_data_read()?;
let bytes = kernel_data.metadata()?.len();
let kernel_data_response = KernelDataResponse { bytes };
let mut response =
Response::new(Type::KernelDataResponse, &kernel_data_response, writer)?;
response.add_attachment(kernel_data);
Ok(Some(response))
}
Type::KernelDataResponse => {
let response: KernelDataResponse = msg.body()?;
debug!(
"handle_payload: kernel_data_response: bytes: {}",
response.bytes
);
let mut writer = BufWriter::new(tempfile()?);
let total_size = response.bytes as usize;
let mut remaining_size = total_size;
while remaining_size > 0 {
let size = msg.copy_attachment(remaining_size, &mut writer)?;
remaining_size = remaining_size.saturating_sub(size);
// Increase received bytes quietly (without affecting the counters).
// Otherwise we risk banning a peer as "abusive".
received_bytes.write().inc_quiet(size as u64);
}
// Remember to seek back to start of the file as the caller is likely
// to read this file directly without reopening it.
writer.seek(SeekFrom::Start(0))?;
let mut file = writer.into_inner().map_err(|_| Error::Internal)?;
debug!(
"handle_payload: kernel_data_response: file size: {}",
file.metadata().unwrap().len()
);
self.adapter.kernel_data_write(&mut file)?;
Ok(None)
}
Type::TxHashSetRequest => {
let sm_req: TxHashSetRequest = msg.body()?;
debug!(

View file

@ -13,11 +13,12 @@
// limitations under the License.
use std::fs::File;
use std::io::{self, Read};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::{io, thread};
use crate::chain;
use crate::core::core;
@ -280,6 +281,12 @@ impl ChainAdapter for DummyAdapter {
fn get_block(&self, _: Hash) -> Option<core::Block> {
None
}
fn kernel_data_read(&self) -> Result<File, chain::Error> {
unimplemented!()
}
fn kernel_data_write(&self, _reader: &mut Read) -> Result<bool, chain::Error> {
unimplemented!()
}
fn txhashset_read(&self, _h: Hash) -> Option<TxHashSetRead> {
unimplemented!()
}

View file

@ -15,7 +15,7 @@
use crate::util::RwLock;
use std::convert::From;
use std::fs::File;
use std::io;
use std::io::{self, Read};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::path::PathBuf;
@ -527,6 +527,10 @@ pub trait ChainAdapter: Sync + Send {
/// Gets a full block by its hash.
fn get_block(&self, h: Hash) -> Option<core::Block>;
fn kernel_data_read(&self) -> Result<File, chain::Error>;
fn kernel_data_write(&self, reader: &mut Read) -> Result<bool, chain::Error>;
/// Provides a reading view into the current txhashset state as well as
/// the required indexes for a consumer to rewind to a consistant state
/// at the provided block hash.

View file

@ -17,6 +17,7 @@
use crate::util::RwLock;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use std::thread;
@ -344,6 +345,16 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
}
fn kernel_data_read(&self) -> Result<File, chain::Error> {
self.chain().kernel_data_read()
}
fn kernel_data_write(&self, reader: &mut Read) -> Result<bool, chain::Error> {
let res = self.chain().kernel_data_write(reader)?;
error!("***** kernel_data_write: {:?}", res);
Ok(true)
}
/// Provides a reading view into the current txhashset state as well as
/// the required indexes for a consumer to rewind to a consistent state
/// at the provided block hash.

View file

@ -18,6 +18,7 @@ failure = "0.1"
failure_derive = "0.1"
lmdb-zero = "0.4.4"
memmap = "0.7"
tempfile = "3.0.5"
serde = "1"
serde_derive = "1"
log = "0.4"

View file

@ -13,7 +13,8 @@
//! Implementation of the persistent Backend for the prunable MMR tree.
use std::{fs, io, time};
use std::fs::{self, File};
use std::{io, time};
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::pmmr::{self, family, Backend};
@ -139,6 +140,12 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
}
}
fn data_as_temp_file(&self) -> Result<File, String> {
self.data_file
.as_temp_file()
.map_err(|_| format!("Failed to build temp data file"))
}
/// Rewind the PMMR backend to the given position.
fn rewind(&mut self, position: u64, rewind_rm_pos: &Bitmap) -> Result<(), String> {
// First rewind the leaf_set with the necessary added and removed positions.

View file

@ -13,13 +13,14 @@
//! Common storage-related types
use memmap;
use tempfile::tempfile;
use crate::core::ser::{
self, BinWriter, FixedLength, Readable, Reader, StreamingReader, Writeable, Writer,
};
use std::fmt::Debug;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Write};
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
use std::marker;
use std::path::{Path, PathBuf};
use std::time;
@ -144,6 +145,13 @@ where
self.file.path()
}
/// Create a new tempfile containing the contents of this data file.
/// This allows callers to see a consistent view of the data without
/// locking the data file.
pub fn as_temp_file(&self) -> io::Result<File> {
self.file.as_temp_file()
}
/// Drop underlying file handles
pub fn release(&mut self) {
self.file.release();
@ -437,6 +445,22 @@ where
}
}
/// Create a new tempfile containing the contents of this append only file.
/// This allows callers to see a consistent view of the data without
/// locking the append only file.
pub fn as_temp_file(&self) -> io::Result<File> {
let mut reader = BufReader::new(File::open(&self.path)?);
let mut writer = BufWriter::new(tempfile()?);
io::copy(&mut reader, &mut writer)?;
// Remember to seek back to start of the file as the caller is likely
// to read this file directly without reopening it.
writer.seek(SeekFrom::Start(0))?;
let file = writer.into_inner()?;
Ok(file)
}
/// Saves a copy of the current file content, skipping data at the provided
/// prune positions. prune_pos must be ordered.
pub fn save_prune(&mut self, prune_pos: &[u64]) -> io::Result<()> {