feature: txhashset downloading progress display on tui ()

This commit is contained in:
Gary Yu 2018-10-13 06:53:50 +08:00 committed by Ignotus Peverell
parent 3fb4669d0a
commit 5c0eb11a7d
10 changed files with 157 additions and 22 deletions

View file

@ -77,7 +77,7 @@ impl<'a> Message<'a> {
read_body(&self.header, self.conn)
}
pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<(), Error> {
pub fn copy_attachment(&mut self, len: usize, writer: &mut Write) -> Result<usize, Error> {
let mut written = 0;
while written < len {
let read_len = cmp::min(8000, len - written);
@ -91,7 +91,7 @@ impl<'a> Message<'a> {
writer.write_all(&mut buf)?;
written += read_len;
}
Ok(())
Ok(written)
}
/// Respond to the message with the provided message type and body

View file

@ -16,6 +16,7 @@ use std::fs::File;
use std::net::{SocketAddr, TcpStream};
use std::sync::{Arc, RwLock};
use chrono::prelude::{DateTime, Utc};
use conn;
use core::core;
use core::core::hash::{Hash, Hashed};
@ -466,6 +467,16 @@ impl ChainAdapter for TrackingAdapter {
fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: SocketAddr) -> bool {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
}
fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}
}
impl NetAdapter for TrackingAdapter {

View file

@ -569,6 +569,16 @@ impl ChainAdapter for Peers {
true
}
}
fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}
}
impl NetAdapter for Peers {

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp;
use std::env;
use std::fs::File;
use std::io::{self, BufWriter};
@ -19,6 +20,7 @@ use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;
use std::time;
use chrono::prelude::Utc;
use conn::{Message, MessageHandler, Response};
use core::core::{self, hash::Hash, CompactBlock};
use core::{global, ser};
@ -255,11 +257,27 @@ impl MessageHandler for Protocol {
);
return Err(Error::BadMessage);
}
let download_start_time = Utc::now();
self.adapter
.txhashset_download_update(download_start_time, 0, sm_arch.bytes);
let mut tmp = env::temp_dir();
tmp.push("txhashset.zip");
let mut save_txhashset_to_file = |file| -> Result<(), Error> {
let mut tmp_zip = BufWriter::new(File::create(file)?);
msg.copy_attachment(sm_arch.bytes as usize, &mut tmp_zip)?;
let total_size = sm_arch.bytes as usize;
let mut downloaded_size: usize = 0;
let mut request_size = 48_000;
while request_size > 0 {
downloaded_size += msg.copy_attachment(request_size, &mut tmp_zip)?;
request_size = cmp::min(48_000, total_size - downloaded_size);
self.adapter.txhashset_download_update(
download_start_time,
downloaded_size as u64,
total_size as u64,
);
}
tmp_zip.into_inner().unwrap().sync_all()?;
Ok(())
};

View file

@ -21,6 +21,7 @@ use std::{io, thread};
use lmdb;
use chrono::prelude::{DateTime, Utc};
use core::core;
use core::core::hash::Hash;
use core::pow::Difficulty;
@ -266,6 +267,15 @@ impl ChainAdapter for DummyAdapter {
fn txhashset_write(&self, _h: Hash, _txhashset_data: File, _peer_addr: SocketAddr) -> bool {
false
}
fn txhashset_download_update(
&self,
_start_time: DateTime<Utc>,
_downloaded_size: u64,
_total_size: u64,
) -> bool {
false
}
}
impl NetAdapter for DummyAdapter {

View file

@ -365,6 +365,14 @@ pub trait ChainAdapter: Sync + Send {
/// state data.
fn txhashset_receive_ready(&self) -> bool;
/// Update txhashset downloading progress
fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool;
/// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be

View file

@ -22,6 +22,7 @@ use std::thread;
use std::time::Instant;
use chain::{self, ChainAdapter, Options, Tip};
use chrono::prelude::{DateTime, Utc};
use common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
use core::core::hash::{Hash, Hashed};
use core::core::transaction::Transaction;
@ -327,7 +328,29 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}
fn txhashset_receive_ready(&self) -> bool {
self.sync_state.status() == SyncStatus::TxHashsetDownload
match self.sync_state.status() {
SyncStatus::TxHashsetDownload { .. } => true,
_ => false,
}
}
fn txhashset_download_update(
&self,
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
) -> bool {
match self.sync_state.status() {
SyncStatus::TxHashsetDownload { .. } => {
self.sync_state
.update_txhashset_download(SyncStatus::TxHashsetDownload {
start_time,
downloaded_size,
total_size,
})
}
_ => false,
}
}
/// Writes a reading view on a txhashset state that's been provided to us.
@ -336,7 +359,8 @@ impl p2p::ChainAdapter for NetToChainAdapter {
/// rewound to the provided indexes.
fn txhashset_write(&self, h: Hash, txhashset_data: File, _peer_addr: SocketAddr) -> bool {
// check status again after download, in case 2 txhashsets made it somehow
if self.sync_state.status() != SyncStatus::TxHashsetDownload {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else {
return true;
}

View file

@ -18,6 +18,7 @@ use std::sync::{Arc, RwLock};
use api;
use chain;
use chrono::prelude::{DateTime, Utc};
use core::global::ChainTypes;
use core::{core, pow};
use p2p;
@ -255,7 +256,11 @@ pub enum SyncStatus {
highest_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload,
TxHashsetDownload {
start_time: DateTime<Utc>,
downloaded_size: u64,
total_size: u64,
},
/// Setting up before validation
TxHashsetSetup,
/// Validating the full state
@ -316,6 +321,17 @@ impl SyncState {
*status = new_status;
}
/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, new_status: SyncStatus) -> bool {
if let SyncStatus::TxHashsetDownload { .. } = new_status {
let mut status = self.current.write().unwrap();
*status = new_status;
true
} else {
false
}
}
/// Communicate sync error
pub fn set_sync_error(&self, error: Error) {
*self.sync_error.write().unwrap() = Some(error);

View file

@ -88,12 +88,14 @@ impl StateSync {
// check peer connection status of this sync
if let Some(ref peer) = self.fast_sync_peer {
if !peer.is_connected() && SyncStatus::TxHashsetDownload == self.sync_state.status() {
sync_need_restart = true;
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart", peer.info.addr,
);
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
LOGGER,
"fast_sync: peer connection lost: {:?}. restart", peer.info.addr,
);
}
}
}
@ -106,13 +108,15 @@ impl StateSync {
if header_head.height == highest_height {
let (go, download_timeout) = self.fast_sync_due();
if download_timeout && SyncStatus::TxHashsetDownload == self.sync_state.status() {
error!(
LOGGER,
"fast_sync: TxHashsetDownload status timeout in 10 minutes!"
);
self.sync_state
.set_sync_error(Error::P2P(p2p::Error::Timeout));
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!(
LOGGER,
"fast_sync: TxHashsetDownload status timeout in 10 minutes!"
);
self.sync_state
.set_sync_error(Error::P2P(p2p::Error::Timeout));
}
}
if go {
@ -136,7 +140,11 @@ impl StateSync {
}
}
self.sync_state.update(SyncStatus::TxHashsetDownload);
self.sync_state.update(SyncStatus::TxHashsetDownload {
start_time: Utc::now(),
downloaded_size: 0,
total_size: 0,
});
}
}
true

View file

@ -14,6 +14,7 @@
//! Basic status view definition
use chrono::prelude::Utc;
use cursive::direction::Orientation;
use cursive::traits::Identifiable;
use cursive::view::View;
@ -26,6 +27,8 @@ use tui::types::TUIStatusListener;
use servers::common::types::SyncStatus;
use servers::ServerStats;
const NANO_TO_MILLIS: f64 = 1.0 / 1_000_000.0;
pub struct TUIStatusView;
impl TUIStatusListener for TUIStatusView {
@ -101,8 +104,35 @@ impl TUIStatusListener for TUIStatusView {
};
format!("Downloading headers: {}%, step 1/4", percent)
}
SyncStatus::TxHashsetDownload => {
"Downloading chain state for fast sync, step 2/4".to_string()
SyncStatus::TxHashsetDownload {
start_time,
downloaded_size,
total_size,
} => {
if total_size > 0 {
let percent = if total_size > 0 {
downloaded_size * 100 / total_size
} else {
0
};
let start = start_time.timestamp_nanos();
let fin = Utc::now().timestamp_nanos();
let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS;
format!("Downloading {}(MB) chain state for fast sync: {}% at {:.1?}(kB/s), step 2/4",
total_size / 1_000_000,
percent,
if dur_ms > 1.0f64 { downloaded_size as f64 / dur_ms as f64 } else { 0f64 },
)
} else {
let start = start_time.timestamp_millis();
let fin = Utc::now().timestamp_millis();
let dur_secs = (fin - start) / 1000;
format!("Downloading chain state for fast sync. Waiting remote peer to start: {}s, step 2/4",
dur_secs,
)
}
}
SyncStatus::TxHashsetSetup => {
"Preparing chain state for validation, step 3/4".to_string()