From 15ea8da34abf0dc51d0f80fc2134725626f24c8f Mon Sep 17 00:00:00 2001 From: Ignotus Peverell Date: Mon, 3 Jul 2017 16:46:25 -0700 Subject: [PATCH] New Chain struct acting as facade for all chain operations Introduce a new Chain struct that maintains the current head, acts as facade to the store and the block processing pipeline and handles all appropriate locking. All higher level components, like the overall server, REST APIs and miner have been update to only depend on Chain. Cleaned up some duplication and tests now that there's a single entry point to all blockchain operations. --- api/src/endpoints.rs | 47 ++----- chain/src/chain.rs | 224 +++++++++++++++++++++++++++++++ chain/src/lib.rs | 5 +- chain/src/pipe.rs | 111 +++------------ chain/src/store.rs | 4 +- chain/src/types.rs | 82 ++++++++--- chain/tests/mine_simple_chain.rs | 78 +++-------- grin/src/adapters.rs | 97 +++---------- grin/src/miner.rs | 39 ++---- grin/src/seed.rs | 9 +- grin/src/server.rs | 94 +++---------- grin/src/sync.rs | 20 +-- p2p/src/peer.rs | 16 +-- p2p/src/store.rs | 2 +- src/bin/grin.rs | 17 +-- util/src/lib.rs | 1 + 16 files changed, 432 insertions(+), 414 deletions(-) create mode 100644 chain/src/chain.rs diff --git a/api/src/endpoints.rs b/api/src/endpoints.rs index 170fb6ce4..f813d67af 100644 --- a/api/src/endpoints.rs +++ b/api/src/endpoints.rs @@ -38,7 +38,7 @@ use util; #[derive(Clone)] pub struct ChainApi { /// data store access - chain_store: Arc, + chain: Arc, } impl ApiEndpoint for ChainApi { @@ -52,7 +52,7 @@ impl ApiEndpoint for ChainApi { } fn get(&self, id: String) -> ApiResult { - self.chain_store.head().map_err(|e| Error::Internal(e.to_string())) + self.chain.head().map_err(|e| Error::Internal(format!("{:?}", e))) } } @@ -60,8 +60,7 @@ impl ApiEndpoint for ChainApi { #[derive(Clone)] pub struct OutputApi { /// data store access - chain_store: Arc, - chain_head: Arc>, + chain: Arc, } impl ApiEndpoint for OutputApi { @@ -77,34 +76,12 @@ impl ApiEndpoint for OutputApi { fn get(&self, id: String) -> ApiResult { debug!("GET output {}", id); let c = util::from_hex(id.clone()).map_err(|e| Error::Argument(format!("Not a valid commitment: {}", id)))?; - let commitment = Commitment::from_vec(c); - // TODO use an actual UTXO tree - // in the meantime doing it the *very* expensive way: - // 1. check the output exists - // 2. run the chain back from the head to check it hasn't been spent - if let Ok(out) = self.chain_store.get_output_by_commit(&commitment) { - let mut block_h: Hash; - { - let chain_head = self.chain_head.clone(); - let head = chain_head.lock().unwrap(); - block_h = head.last_block_h; - } - loop { - let b = self.chain_store.get_block(&block_h)?; - for input in b.inputs { - if input.commitment() == commitment { - return Err(Error::NotFound); - } - } - if b.header.height == 1 { - return Ok(out); - } else { - block_h = b.header.previous; - } - } - } - Err(Error::NotFound) + match self.chain.get_unspent(&Commitment::from_vec(c)) { + Some(utxo) => Ok(utxo), + None => Err(Error::NotFound), + } + } } @@ -176,8 +153,7 @@ struct TxWrapper { /// Start all server REST APIs. Just register all of them on a ApiServer /// instance and runs the corresponding HTTP server. pub fn start_rest_apis(addr: String, - chain_store: Arc, - chain_head: Arc>, + chain: Arc, tx_pool: Arc>>) where T: pool::BlockChain + Clone + Send + Sync + 'static { @@ -185,11 +161,10 @@ pub fn start_rest_apis(addr: String, thread::spawn(move || { let mut apis = ApiServer::new("/v1".to_string()); apis.register_endpoint("/chain".to_string(), - ChainApi { chain_store: chain_store.clone() }); + ChainApi { chain: chain.clone() }); apis.register_endpoint("/chain/utxo".to_string(), OutputApi { - chain_store: chain_store.clone(), - chain_head: chain_head.clone(), + chain: chain.clone(), }); apis.register_endpoint("/pool".to_string(), PoolApi { tx_pool: tx_pool }); diff --git a/chain/src/chain.rs b/chain/src/chain.rs new file mode 100644 index 000000000..54a17bebb --- /dev/null +++ b/chain/src/chain.rs @@ -0,0 +1,224 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Facade and handler for the rest of the blockchain implementation +//! and mostly the chain pipeline. + +use std::sync::{Arc, Mutex}; + +use secp::pedersen::Commitment; + +use core::core::{Block, BlockHeader, Output}; +use core::core::target::Difficulty; +use core::core::hash::Hash; +use core::{consensus, genesis, pow}; +use grin_store; +use pipe; +use store; +use types::*; + +/// Helper macro to transform a Result into an Option with None in case +/// of error +macro_rules! none_err { + ($trying:expr) => {{ + let tried = $trying; + if let Err(_) = tried { + return None; + } + tried.unwrap() + }} +} + +/// Facade to the blockchain block processing pipeline and storage. Provides +/// the current view of the UTXO set according to the chain state. Also +/// maintains locking for the pipeline to avoid conflicting processing. +pub struct Chain { + store: Arc, + adapter: Arc, + head: Arc>, + block_process_lock: Arc>, + test_mode: bool, +} + +unsafe impl Sync for Chain {} +unsafe impl Send for Chain {} + +impl Chain { + /// Initializes the blockchain and returns a new Chain instance. Does a + /// check + /// on the current chain head to make sure it exists and creates one based + /// on + /// the genesis block if necessary. + pub fn init(test_mode: bool, + db_root: String, + adapter: Arc) + -> Result { + let chain_store = store::ChainKVStore::new(db_root)?; + + // check if we have a head in store, otherwise the genesis block is it + let head = match chain_store.head() { + Ok(tip) => tip, + Err(grin_store::Error::NotFoundErr) => { + info!("No genesis block found, creating and saving one."); + let mut gen = genesis::genesis(); + let diff = gen.header.difficulty.clone(); + let sz = if test_mode { + consensus::TEST_SIZESHIFT + } else { + consensus::DEFAULT_SIZESHIFT + }; + pow::pow_size(&mut gen.header, diff, sz as u32).unwrap(); + chain_store.save_block(&gen)?; + + // saving a new tip based on genesis + let tip = Tip::new(gen.hash()); + chain_store.save_head(&tip)?; + info!("Saved genesis block with hash {}", gen.hash()); + tip + } + Err(e) => return Err(Error::StoreErr(e)), + }; + + let head = chain_store.head()?; + + Ok(Chain { + store: Arc::new(chain_store), + adapter: adapter, + head: Arc::new(Mutex::new(head)), + block_process_lock: Arc::new(Mutex::new(true)), + test_mode: test_mode, + }) + } + + /// Attempt to add a new block to the chain. Returns the new chain tip if it + /// has been added to the longest chain, None if it's added to an (as of + /// now) + /// orphan chain. + pub fn process_block(&self, b: &Block, opts: Options) -> Result, Error> { + + let head = self.store.head().map_err(&Error::StoreErr)?; + let ctx = self.ctx_from_head(head, opts); + + let res = pipe::process_block(b, ctx); + + if let Ok(Some(ref tip)) = res { + let chain_head = self.head.clone(); + let mut head = chain_head.lock().unwrap(); + *head = tip.clone(); + } + + res + } + + /// Attempt to add a new header to the header chain. Only necessary during + /// sync. + pub fn process_block_header(&self, + bh: &BlockHeader, + opts: Options) + -> Result, Error> { + + let head = self.store.get_header_head().map_err(&Error::StoreErr)?; + let ctx = self.ctx_from_head(head, opts); + + pipe::process_block_header(bh, ctx) + } + + fn ctx_from_head(&self, head: Tip, opts: Options) -> pipe::BlockContext { + let mut opts_in = opts; + if self.test_mode { + opts_in = opts_in | EASY_POW; + } + pipe::BlockContext { + opts: opts_in, + store: self.store.clone(), + adapter: self.adapter.clone(), + head: head, + lock: self.block_process_lock.clone(), + } + } + + /// Gets an unspent output from its commitment. With return None if the + /// output + /// doesn't exist or has been spent. This querying is done in a way that's + /// constistent with the current chain state and more specifically the + /// current + /// branch it is on in case of forks. + pub fn get_unspent(&self, output_ref: &Commitment) -> Option { + // TODO use an actual UTXO tree + // in the meantime doing it the *very* expensive way: + // 1. check the output exists + // 2. run the chain back from the head to check it hasn't been spent + if let Ok(out) = self.store.get_output_by_commit(output_ref) { + let head = none_err!(self.store.head()); + let mut block_h = head.last_block_h; + loop { + let b = none_err!(self.store.get_block(&block_h)); + for input in b.inputs { + if input.commitment() == *output_ref { + return None; + } + } + if b.header.height == 1 { + return Some(out); + } else { + block_h = b.header.previous; + } + } + } + None + } + + /// Total difficulty at the head of the chain + pub fn total_difficulty(&self) -> Difficulty { + self.head.lock().unwrap().clone().total_difficulty + } + + /// Get the tip that's also the head of the chain + pub fn head(&self) -> Result { + Ok(self.head.lock().unwrap().clone()) + } + + /// Block header for the chain head + pub fn head_header(&self) -> Result { + self.store.head_header().map_err(&Error::StoreErr) + } + + /// Gets a block header by hash + pub fn get_block(&self, h: &Hash) -> Result { + self.store.get_block(h).map_err(&Error::StoreErr) + } + + /// Gets a block header by hash + pub fn get_block_header(&self, h: &Hash) -> Result { + self.store.get_block_header(h).map_err(&Error::StoreErr) + } + + /// Gets the block header at the provided height + pub fn get_header_by_height(&self, height: u64) -> Result { + self.store.get_header_by_height(height).map_err(&Error::StoreErr) + } + + /// Get the tip of the header chain + pub fn get_header_head(&self) -> Result { + self.store.get_header_head().map_err(&Error::StoreErr) + } + + /// Builds an iterator on blocks starting from the current chain head and + /// running backward. Specialized to return information pertaining to block + /// difficulty calculation (timestamp and previous difficulties). + pub fn difficulty_iter(&self) -> store::DifficultyIter { + let head = self.head.lock().unwrap(); + store::DifficultyIter::from(head.last_block_h, self.store.clone()) + } +} diff --git a/chain/src/lib.rs b/chain/src/lib.rs index cff9ef779..fe6f1cdc9 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -34,11 +34,12 @@ extern crate grin_core as core; extern crate grin_store; extern crate secp256k1zkp as secp; +mod chain; pub mod pipe; pub mod store; pub mod types; // Re-export the base interface -pub use types::{ChainStore, Tip, ChainAdapter}; -pub use pipe::{SYNC, NONE, EASY_POW, process_block, process_block_header, Options, Error}; +pub use chain::Chain; +pub use types::{ChainStore, Tip, ChainAdapter, SYNC, NONE, SKIP_POW, EASY_POW, Options, Error}; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index df391b05f..1ec2d0ab6 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -26,130 +26,57 @@ use core::core::target::Difficulty; use core::core::{BlockHeader, Block, Proof}; use core::pow; use core::ser; -use grin_store; -use types; -use types::{Tip, ChainStore, ChainAdapter, NoopAdapter}; +use types::*; use store; -bitflags! { - /// Options for block validation - pub flags Options: u32 { - const NONE = 0b00000001, - /// Runs without checking the Proof of Work, mostly to make testing easier. - const SKIP_POW = 0b00000010, - /// Runs PoW verification with a lower cycle size. - const EASY_POW = 0b00000100, - /// Adds block while in syncing mode. - const SYNC = 0b00001000, - } -} - /// Contextual information required to process a new block and either reject or /// accept it. pub struct BlockContext { - opts: Options, - store: Arc, - adapter: Arc, - head: Tip, -} - -#[derive(Debug)] -pub enum Error { - /// The block doesn't fit anywhere in our chain - Unfit(String), - /// Difficulty is too low either compared to ours or the block PoW hash - DifficultyTooLow, - /// Addition of difficulties on all previous block is wrong - WrongTotalDifficulty, - /// Size of the Cuckoo graph in block header doesn't match PoW requirements - WrongCuckooSize, - /// The proof of work is invalid - InvalidPow, - /// The block doesn't sum correctly or a tx signature is invalid - InvalidBlockProof(secp::Error), - /// Block time is too old - InvalidBlockTime, - /// Block height is invalid (not previous + 1) - InvalidBlockHeight, - /// Internal issue when trying to save or load data from store - StoreErr(grin_store::Error), - SerErr(ser::Error), - Other(String), -} - -impl From for Error { - fn from(e: grin_store::Error) -> Error { - Error::StoreErr(e) - } -} -impl From for Error { - fn from(e: ser::Error) -> Error { - Error::SerErr(e) - } + pub opts: Options, + pub store: Arc, + pub adapter: Arc, + pub head: Tip, + pub lock: Arc>, } /// Runs the block processing pipeline, including validation and finding a /// place for the new block in the chain. Returns the new /// chain head if updated. -pub fn process_block(b: &Block, - store: Arc, - adapter: Arc, - opts: Options) - -> Result, Error> { +pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result, Error> { // TODO should just take a promise for a block with a full header so we don't // spend resources reading the full block when its header is invalid - let head = store.head().map_err(&Error::StoreErr)?; - - let mut ctx = BlockContext { - opts: opts, - store: store, - adapter: adapter, - head: head, - }; - info!("Starting validation pipeline for block {} at {} with {} inputs and {} outputs.", b.hash(), b.header.height, b.inputs.len(), b.outputs.len()); - try!(check_known(b.hash(), &mut ctx)); + check_known(b.hash(), &mut ctx)?; if !ctx.opts.intersects(SYNC) { // in sync mode, the header has already been validated - try!(validate_header(&b.header, &mut ctx)); + validate_header(&b.header, &mut ctx)?; } - try!(validate_block(b, &mut ctx)); + validate_block(b, &mut ctx)?; debug!("Block at {} with hash {} is valid, going to save and append.", b.header.height, b.hash()); - try!(add_block(b, &mut ctx)); - // TODO a global lock should be set before that step or even earlier + + ctx.lock.lock(); + add_block(b, &mut ctx)?; update_head(b, &mut ctx) } -pub fn process_block_header(bh: &BlockHeader, - store: Arc, - adapter: Arc, - opts: Options) - -> Result, Error> { - - let head = store.get_header_head().map_err(&Error::StoreErr)?; - - let mut ctx = BlockContext { - opts: opts, - store: store, - adapter: adapter, - head: head, - }; +pub fn process_block_header(bh: &BlockHeader, mut ctx: BlockContext) -> Result, Error> { info!("Starting validation pipeline for block header {} at {}.", bh.hash(), bh.height); - try!(check_known(bh.hash(), &mut ctx)); - try!(validate_header(&bh, &mut ctx)); - try!(add_block_header(bh, &mut ctx)); - // TODO a global lock should be set before that step or even earlier + check_known(bh.hash(), &mut ctx)?; + validate_header(&bh, &mut ctx)?; + add_block_header(bh, &mut ctx)?; + + ctx.lock.lock(); update_header_head(bh, &mut ctx) } diff --git a/chain/src/store.rs b/chain/src/store.rs index 281d8c5da..2a1800425 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -156,8 +156,8 @@ pub struct DifficultyIter { } impl DifficultyIter { - /// Build a new iterator using the provided chain store and starting from - /// the provided block hash. + /// Build a new iterator using the provided chain store and starting from + /// the provided block hash. pub fn from(start: Hash, store: Arc) -> DifficultyIter { DifficultyIter { next: start, diff --git a/chain/src/types.rs b/chain/src/types.rs index 306b3f27a..3daf6af1e 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -14,13 +14,63 @@ //! Base types that the block chain pipeline requires. +use secp; use secp::pedersen::Commitment; -use grin_store::Error; +use grin_store as store; use core::core::{Block, BlockHeader, Output}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use core::ser; +use grin_store; + +bitflags! { + /// Options for block validation + pub flags Options: u32 { + const NONE = 0b00000001, + /// Runs without checking the Proof of Work, mostly to make testing easier. + const SKIP_POW = 0b00000010, + /// Runs PoW verification with a lower cycle size. + const EASY_POW = 0b00000100, + /// Adds block while in syncing mode. + const SYNC = 0b00001000, + } +} + +#[derive(Debug)] +pub enum Error { + /// The block doesn't fit anywhere in our chain + Unfit(String), + /// Difficulty is too low either compared to ours or the block PoW hash + DifficultyTooLow, + /// Addition of difficulties on all previous block is wrong + WrongTotalDifficulty, + /// Size of the Cuckoo graph in block header doesn't match PoW requirements + WrongCuckooSize, + /// The proof of work is invalid + InvalidPow, + /// The block doesn't sum correctly or a tx signature is invalid + InvalidBlockProof(secp::Error), + /// Block time is too old + InvalidBlockTime, + /// Block height is invalid (not previous + 1) + InvalidBlockHeight, + /// Internal issue when trying to save or load data from store + StoreErr(grin_store::Error), + SerErr(ser::Error), + Other(String), +} + +impl From for Error { + fn from(e: grin_store::Error) -> Error { + Error::StoreErr(e) + } +} +impl From for Error { + fn from(e: ser::Error) -> Error { + Error::SerErr(e) + } +} /// The tip of a fork. A handle to the fork ancestry from its leaf in the /// blockchain tree. References the max height and the latest and previous @@ -89,53 +139,53 @@ impl ser::Readable for Tip { /// blocks. pub trait ChainStore: Send + Sync { /// Get the tip that's also the head of the chain - fn head(&self) -> Result; + fn head(&self) -> Result; /// Block header for the chain head - fn head_header(&self) -> Result; + fn head_header(&self) -> Result; /// Save the provided tip as the current head of our chain - fn save_head(&self, t: &Tip) -> Result<(), Error>; + fn save_head(&self, t: &Tip) -> Result<(), store::Error>; /// Save the provided tip as the current head of the body chain, leaving the /// header chain alone. - fn save_body_head(&self, t: &Tip) -> Result<(), Error>; + fn save_body_head(&self, t: &Tip) -> Result<(), store::Error>; /// Gets a block header by hash - fn get_block(&self, h: &Hash) -> Result; + fn get_block(&self, h: &Hash) -> Result; /// Gets a block header by hash - fn get_block_header(&self, h: &Hash) -> Result; + fn get_block_header(&self, h: &Hash) -> Result; /// Checks whether a block has been been processed and saved - fn check_block_exists(&self, h: &Hash) -> Result; + fn check_block_exists(&self, h: &Hash) -> Result; /// Save the provided block in store - fn save_block(&self, b: &Block) -> Result<(), Error>; + fn save_block(&self, b: &Block) -> Result<(), store::Error>; /// Save the provided block header in store - fn save_block_header(&self, bh: &BlockHeader) -> Result<(), Error>; + fn save_block_header(&self, bh: &BlockHeader) -> Result<(), store::Error>; /// Get the tip of the header chain - fn get_header_head(&self) -> Result; + fn get_header_head(&self) -> Result; /// Save the provided tip as the current head of the block header chain - fn save_header_head(&self, t: &Tip) -> Result<(), Error>; + fn save_header_head(&self, t: &Tip) -> Result<(), store::Error>; /// Gets the block header at the provided height - fn get_header_by_height(&self, height: u64) -> Result; + fn get_header_by_height(&self, height: u64) -> Result; /// Gets an output by its commitment - fn get_output_by_commit(&self, commit: &Commitment) -> Result; + fn get_output_by_commit(&self, commit: &Commitment) -> Result; /// Checks whether an output commitment exists and returns the output hash - fn has_output_commit(&self, commit: &Commitment) -> Result; + fn has_output_commit(&self, commit: &Commitment) -> Result; /// Saves the provided block header at the corresponding height. Also check /// the consistency of the height chain in store by assuring previous /// headers /// are also at their respective heights. - fn setup_height(&self, bh: &BlockHeader) -> Result<(), Error>; + fn setup_height(&self, bh: &BlockHeader) -> Result<(), store::Error>; } /// Bridge between the chain pipeline and the rest of the system. Handles diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 9e7b1fb4d..2062f21dd 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -35,46 +35,27 @@ use grin_core::consensus; fn mine_empty_chain() { env_logger::init(); let mut rng = OsRng::new().unwrap(); - let store = grin_chain::store::ChainKVStore::new(".grin".to_string()).unwrap(); - - // save a genesis block - let mut gen = grin_core::genesis::genesis(); - let diff = gen.header.difficulty.clone(); - pow::pow_size(&mut gen.header, diff, consensus::TEST_SIZESHIFT as u32).unwrap(); - store.save_block(&gen).unwrap(); - - // setup a new head tip - let tip = Tip::new(gen.hash()); - store.save_head(&tip).unwrap(); + let chain = grin_chain::Chain::init(true, ".grin".to_string(), Arc::new(NoopAdapter{})).unwrap(); // mine and add a few blocks - let mut prev = gen; let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let reward_key = secp::key::SecretKey::new(&secp, &mut rng); - let arc_store = Arc::new(store); - let adapter = Arc::new(NoopAdapter {}); for n in 1..4 { - let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); - b.header.timestamp = prev.header.timestamp + time::Duration::seconds(60); + let prev = chain.head_header().unwrap(); + let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); - let diff_iter = store::DifficultyIter::from(b.header.previous, arc_store.clone()); - let difficulty = consensus::next_difficulty(diff_iter).unwrap(); + let difficulty = consensus::next_difficulty(chain.difficulty_iter()).unwrap(); b.header.difficulty = difficulty.clone(); pow::pow_size(&mut b.header, difficulty, consensus::TEST_SIZESHIFT as u32).unwrap(); - grin_chain::pipe::process_block(&b, - arc_store.clone(), - adapter.clone(), - grin_chain::pipe::EASY_POW) - .unwrap(); + chain.process_block(&b, grin_chain::EASY_POW).unwrap(); // checking our new head - let head = arc_store.clone().head().unwrap(); + let head = chain.head().unwrap(); assert_eq!(head.height, n); assert_eq!(head.last_block_h, b.hash()); - - prev = b; } } @@ -82,58 +63,37 @@ fn mine_empty_chain() { fn mine_forks() { env_logger::init(); let mut rng = OsRng::new().unwrap(); - let store = grin_chain::store::ChainKVStore::new(".grin2".to_string()).unwrap(); - - // save a genesis block - let mut gen = grin_core::genesis::genesis(); - let diff = gen.header.difficulty.clone(); - pow::pow_size(&mut gen.header, diff, consensus::TEST_SIZESHIFT as u32).unwrap(); - store.save_block(&gen).unwrap(); - - // setup a new head tip - let tip = Tip::new(gen.hash()); - store.save_head(&tip).unwrap(); + let chain = grin_chain::Chain::init(true, ".grin2".to_string(), Arc::new(NoopAdapter{})).unwrap(); // mine and add a few blocks - let mut prev = gen; let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let reward_key = secp::key::SecretKey::new(&secp, &mut rng); - let arc_store = Arc::new(store); - let adapter = Arc::new(NoopAdapter {}); for n in 1..4 { - let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); - b.header.timestamp = prev.header.timestamp + time::Duration::seconds(60); + let prev = chain.head_header().unwrap(); + let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); b.header.total_difficulty = Difficulty::from_num(2*n); - grin_chain::pipe::process_block(&b, - arc_store.clone(), - adapter.clone(), - grin_chain::pipe::SKIP_POW) - .unwrap(); + chain.process_block(&b, grin_chain::SKIP_POW).unwrap(); // checking our new head thread::sleep(::std::time::Duration::from_millis(50)); - let head = arc_store.clone().head().unwrap(); + let head = chain.head().unwrap(); assert_eq!(head.height, n as u64); assert_eq!(head.last_block_h, b.hash()); assert_eq!(head.prev_block_h, prev.hash()); - let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); - b.header.timestamp = prev.header.timestamp + time::Duration::seconds(60); + // build another block with higher difficulty + let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); b.header.total_difficulty = Difficulty::from_num(2*n+1); - grin_chain::pipe::process_block(&b, - arc_store.clone(), - adapter.clone(), - grin_chain::pipe::SKIP_POW) - .unwrap(); + chain.process_block(&b, grin_chain::SKIP_POW).unwrap(); - // checking our new head + // checking head switch thread::sleep(::std::time::Duration::from_millis(50)); - let head = arc_store.clone().head().unwrap(); + let head = chain.head().unwrap(); assert_eq!(head.height, n as u64); assert_eq!(head.last_block_h, b.hash()); assert_eq!(head.prev_block_h, prev.hash()); - - prev = b; } } diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 9485e8090..7869f9193 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -33,10 +33,7 @@ use sync; /// implementations. pub struct NetToChainAdapter { test_mode: bool, - /// the reference copy of the current chain state - chain_head: Arc>, - chain_store: Arc, - chain_adapter: Arc, + chain: Arc, peer_store: Arc, tx_pool: Arc>>, @@ -45,7 +42,7 @@ pub struct NetToChainAdapter { impl NetAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { - self.chain_head.lock().unwrap().clone().total_difficulty + self.chain.total_difficulty() } fn transaction_received(&self, tx: core::Transaction) { @@ -63,17 +60,10 @@ impl NetAdapter for NetToChainAdapter { b.hash()); // pushing the new block through the chain pipeline - let store = self.chain_store.clone(); - let chain_adapter = self.chain_adapter.clone(); - let res = chain::process_block(&b, store, chain_adapter, self.chain_opts()); + let res = self.chain.process_block(&b, self.chain_opts()); - // log errors and update the shared head reference on success if let Err(e) = res { debug!("Block {} refused by chain: {:?}", b.hash(), e); - } else if let Ok(Some(tip)) = res { - let chain_head = self.chain_head.clone(); - let mut head = chain_head.lock().unwrap(); - *head = tip; } if self.syncer.borrow().syncing() { @@ -85,10 +75,7 @@ impl NetAdapter for NetToChainAdapter { // try to add each header to our header chain let mut added_hs = vec![]; for bh in bhs { - let store = self.chain_store.clone(); - let chain_adapter = self.chain_adapter.clone(); - - let res = chain::process_block_header(&bh, store, chain_adapter, self.chain_opts()); + let res = self.chain.process_block_header(&bh, self.chain_opts()); match res { Ok(_) => { added_hs.push(bh.hash()); @@ -122,10 +109,10 @@ impl NetAdapter for NetToChainAdapter { } // go through the locator vector and check if we know any of these headers - let known = self.chain_store.get_block_header(&locator[0]); + let known = self.chain.get_block_header(&locator[0]); let header = match known { Ok(header) => header, - Err(store::Error::NotFoundErr) => { + Err(chain::Error::StoreErr(store::Error::NotFoundErr)) => { return self.locate_headers(locator[1..].to_vec()); } Err(e) => { @@ -138,10 +125,10 @@ impl NetAdapter for NetToChainAdapter { let hh = header.height; let mut headers = vec![]; for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) { - let header = self.chain_store.get_header_by_height(h); + let header = self.chain.get_header_by_height(h); match header { Ok(head) => headers.push(head), - Err(store::Error::NotFoundErr) => break, + Err(chain::Error::StoreErr(store::Error::NotFoundErr)) => break, Err(e) => { error!("Could not build header locator: {:?}", e); return vec![]; @@ -153,8 +140,7 @@ impl NetAdapter for NetToChainAdapter { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option { - let store = self.chain_store.clone(); - let b = store.get_block(&h); + let b = self.chain.get_block(&h); match b { Ok(b) => Some(b), _ => None, @@ -207,17 +193,13 @@ impl NetAdapter for NetToChainAdapter { impl NetToChainAdapter { pub fn new(test_mode: bool, - chain_head: Arc>, - chain_store: Arc, - chain_adapter: Arc, + chain_ref: Arc, tx_pool: Arc>>, peer_store: Arc) -> NetToChainAdapter { NetToChainAdapter { test_mode: test_mode, - chain_head: chain_head, - chain_store: chain_store, - chain_adapter: chain_adapter, + chain: chain_ref, peer_store: peer_store, tx_pool: tx_pool, syncer: OneTime::new(), @@ -283,63 +265,26 @@ impl ChainToPoolAndNetAdapter { } /// Implements the view of the blockchain required by the TransactionPool to -/// operate. This is mostly getting information on unspent outputs in a -/// manner consistent with the chain state. +/// operate. Mostly needed to break any direct lifecycle or implementation +/// dependency between the pool and the chain. #[derive(Clone)] pub struct PoolToChainAdapter { - chain_head: Arc>, - chain_store: Arc, -} - -macro_rules! none_err { - ($trying:expr) => {{ - let tried = $trying; - if let Err(_) = tried { - return None; - } - tried.unwrap() - }} + chain: OneTime>, } impl PoolToChainAdapter { /// Create a new pool adapter - pub fn new(chain_head: Arc>, - chain_store: Arc) - -> PoolToChainAdapter { - PoolToChainAdapter { - chain_head: chain_head, - chain_store: chain_store, - } + pub fn new() -> PoolToChainAdapter { + PoolToChainAdapter { chain: OneTime::new() } + } + + pub fn set_chain(&self, chain_ref: Arc) { + self.chain.init(chain_ref); } } impl pool::BlockChain for PoolToChainAdapter { fn get_unspent(&self, output_ref: &Commitment) -> Option { - // TODO use an actual UTXO tree - // in the meantime doing it the *very* expensive way: - // 1. check the output exists - // 2. run the chain back from the head to check it hasn't been spent - if let Ok(out) = self.chain_store.get_output_by_commit(output_ref) { - let mut block_h: Hash; - { - let chain_head = self.chain_head.clone(); - let head = chain_head.lock().unwrap(); - block_h = head.last_block_h; - } - loop { - let b = none_err!(self.chain_store.get_block(&block_h)); - for input in b.inputs { - if input.commitment() == *output_ref { - return None; - } - } - if b.header.height == 1 { - return Some(out); - } else { - block_h = b.header.previous; - } - } - } - None + self.chain.borrow().get_unspent(output_ref) } } diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 5c9e09985..cf3ebafea 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -26,7 +26,7 @@ use api; use core::consensus; use core::consensus::*; use core::core; -use core::core::target::*; +use core::core::target::Difficulty; use core::core::hash::{Hash, Hashed}; use core::pow::cuckoo; use core::ser; @@ -42,10 +42,7 @@ const MAX_TX: u32 = 5000; pub struct Miner { config: MinerConfig, - chain_head: Arc>, - chain_store: Arc, - /// chain adapter to net - chain_adapter: Arc, + chain: Arc, tx_pool: Arc>>, //Just to hold the port we're on, so this miner can be identified @@ -57,16 +54,12 @@ impl Miner { /// Creates a new Miner. Needs references to the chain state and its /// storage. pub fn new(config: MinerConfig, - chain_head: Arc>, - chain_store: Arc, - chain_adapter: Arc, + chain_ref: Arc, tx_pool: Arc>>) -> Miner { Miner { config: config, - chain_head: chain_head, - chain_store: chain_store, - chain_adapter: chain_adapter, + chain: chain_ref, tx_pool: tx_pool, debug_output_id: String::from("none"), } @@ -88,12 +81,8 @@ impl Miner { let mut coinbase = self.get_coinbase(); loop { // get the latest chain state and build a block on top of it - let head: core::BlockHeader; - let mut latest_hash: Hash; - { - head = self.chain_store.head_header().unwrap(); - latest_hash = self.chain_head.lock().unwrap().last_block_h; - } + let head = self.chain.head_header().unwrap(); + let mut latest_hash = self.chain.head().unwrap().last_block_h; let mut b = self.build_block(&head, coinbase.clone()); // look for a pow for at most 2 sec on the same block (to give a chance to new @@ -128,9 +117,7 @@ impl Miner { } } b.header.nonce += 1; - { - latest_hash = self.chain_head.lock().unwrap().last_block_h; - } + latest_hash = self.chain.head().unwrap().last_block_h; iter_count += 1; //Artificial slow down @@ -149,18 +136,12 @@ impl Miner { } else { chain::NONE }; - let res = chain::process_block(&b, - self.chain_store.clone(), - self.chain_adapter.clone(), - opts); + let res = self.chain.process_block(&b, opts); if let Err(e) = res { error!("(Server ID: {}) Error validating mined block: {:?}", self.debug_output_id, e); - } else if let Ok(Some(tip)) = res { - let chain_head = self.chain_head.clone(); - let mut head = chain_head.lock().unwrap(); + } else { coinbase = self.get_coinbase(); - *head = tip; } } else { debug!("(Server ID: {}) No solution found after {} iterations, continuing...", @@ -182,7 +163,7 @@ impl Miner { now_sec += 1; } - let diff_iter = chain::store::DifficultyIter::from(head.hash(), self.chain_store.clone()); + let diff_iter = self.chain.difficulty_iter(); let difficulty = consensus::next_difficulty(diff_iter).unwrap(); let txs_box = self.tx_pool.read().unwrap().prepare_mineable_transactions(MAX_TX); diff --git a/grin/src/seed.rs b/grin/src/seed.rs index dbadebe9a..ca9ad4345 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -166,13 +166,18 @@ impl Seeder { rx: mpsc::UnboundedReceiver) -> Box> { let capab = self.capabilities; + let p2p_store = self.peer_store.clone(); let p2p_server = self.p2p.clone(); let listener = rx.for_each(move |peer_addr| { debug!("New peer address to connect to: {}.", peer_addr); let inner_h = h.clone(); if p2p_server.peer_count() < PEER_MAX_COUNT { - connect_and_req(capab, p2p_server.clone(), inner_h, peer_addr) + connect_and_req(capab, + p2p_store.clone(), + p2p_server.clone(), + inner_h, + peer_addr) } else { Box::new(future::ok(())) } @@ -222,6 +227,7 @@ pub fn predefined_seeds(addrs_str: Vec) } fn connect_and_req(capab: p2p::Capabilities, + peer_store: Arc, p2p: Arc, h: reactor::Handle, addr: SocketAddr) @@ -234,6 +240,7 @@ fn connect_and_req(capab: p2p::Capabilities, } Err(e) => { error!("Peer request error: {:?}", e); + peer_store.update_state(addr, p2p::State::Defunct); } _ => {} } diff --git a/grin/src/server.rs b/grin/src/server.rs index 57467ce62..4288dc8e3 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -45,13 +45,8 @@ pub struct Server { evt_handle: reactor::Handle, /// handle to our network server p2p: Arc, - /// the reference copy of the current chain state - chain_head: Arc>, /// data store access - chain_store: Arc, - /// chain adapter to net, required for miner and anything that submits - /// blocks - chain_adapter: Arc, + chain: Arc, /// in-memory transaction pool tx_pool: Arc>>, } @@ -84,27 +79,25 @@ impl Server { pub fn future(mut config: ServerConfig, evt_handle: &reactor::Handle) -> Result { check_config(&mut config); - let (chain_store, head) = try!(store_head(&config)); - let shared_head = Arc::new(Mutex::new(head)); - - let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); - - let pool_adapter = Arc::new(PoolToChainAdapter::new(shared_head.clone(), - chain_store.clone())); - let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(pool_adapter))); + let pool_adapter = Arc::new(PoolToChainAdapter::new()); + let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(pool_adapter.clone()))); let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); + let shared_chain = Arc::new(chain::Chain::init(config.test_mode, + config.db_root.clone(), + chain_adapter.clone())?); + pool_adapter.set_chain(shared_chain.clone()); + + let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); let net_adapter = Arc::new(NetToChainAdapter::new(config.test_mode, - shared_head.clone(), - chain_store.clone(), - chain_adapter.clone(), + shared_chain.clone(), tx_pool.clone(), peer_store.clone())); - let server = + let p2p_server = Arc::new(p2p::Server::new(config.capabilities, config.p2p_config, net_adapter.clone())); - chain_adapter.init(server.clone()); + chain_adapter.init(p2p_server.clone()); - let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), server.clone()); + let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone()); match config.seeding_type.clone() { Seeding::None => {} Seeding::List(seeds) => { @@ -115,26 +108,23 @@ impl Server { } } - let sync = sync::Syncer::new(chain_store.clone(), server.clone()); + let sync = sync::Syncer::new(shared_chain.clone(), p2p_server.clone()); net_adapter.start_sync(sync); - evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); + evt_handle.spawn(p2p_server.start(evt_handle.clone()).map_err(|_| ())); info!("Starting rest apis at: {}", &config.api_http_addr); api::start_rest_apis(config.api_http_addr.clone(), - chain_store.clone(), - shared_head.clone(), + shared_chain.clone(), tx_pool.clone()); warn!("Grin server started."); Ok(Server { config: config, evt_handle: evt_handle.clone(), - p2p: server, - chain_head: shared_head, - chain_store: chain_store, - chain_adapter: chain_adapter, + p2p: p2p_server, + chain: shared_chain, tx_pool: tx_pool, }) } @@ -153,11 +143,7 @@ impl Server { /// Start mining for blocks on a separate thread. Relies on a toy miner, /// mostly for testing. pub fn start_miner(&self, config: MinerConfig) { - let mut miner = miner::Miner::new(config, - self.chain_head.clone(), - self.chain_store.clone(), - self.chain_adapter.clone(), - self.tx_pool.clone()); + let mut miner = miner::Miner::new(config, self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}",self.config.p2p_config.port)); thread::spawn(move || { miner.run_loop(); @@ -165,9 +151,7 @@ impl Server { } pub fn head(&self) -> chain::Tip { - let head = self.chain_head.clone(); - let h = head.lock().unwrap(); - h.clone() + self.chain.head().unwrap() } /// Returns a set of stats about this server. This and the ServerStats structure @@ -182,44 +166,6 @@ impl Server { } } -// Helper function to create the chain storage and check if it already has a -// genesis block -fn store_head(config: &ServerConfig) - -> Result<(Arc, chain::Tip), Error> { - let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone()) - .map_err(&Error::Store)); - - // check if we have a head in store, otherwise the genesis block is it - let head = match chain_store.head() { - Ok(tip) => tip, - Err(store::Error::NotFoundErr) => { - info!("No genesis block found, creating and saving one."); - let mut gen = core::genesis::genesis(); - let diff = gen.header.difficulty.clone(); - core::pow::pow_size(&mut gen.header, - diff, - config.mining_config.cuckoo_size as u32) - .unwrap(); - chain_store.save_block(&gen).map_err(&Error::Store)?; - let tip = chain::types::Tip::new(gen.hash()); - chain_store.save_head(&tip).map_err(&Error::Store)?; - info!("Saved genesis block with hash {}", gen.hash()); - tip - } - Err(e) => return Err(Error::Store(e)), - }; - - let head = chain_store.head()?; - let head_header = chain_store.head_header()?; - info!("Starting server with head {} at {} and header head {} at {}", - head.last_block_h, - head.height, - head_header.hash(), - head_header.height); - - Ok((Arc::new(chain_store), head)) -} - fn check_config(config: &mut ServerConfig) { // applying test/normal config config.mining_config.cuckoo_size = if config.test_mode { diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 8f133a91a..39a19824f 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -31,7 +31,7 @@ use p2p; use types::Error; pub struct Syncer { - chain_store: Arc, + chain: Arc, p2p: Arc, sync: Mutex, @@ -41,9 +41,9 @@ pub struct Syncer { } impl Syncer { - pub fn new(chain_store: Arc, p2p: Arc) -> Syncer { + pub fn new(chain_ref: Arc, p2p: Arc) -> Syncer { Syncer { - chain_store: chain_store, + chain: chain_ref, p2p: p2p, sync: Mutex::new(true), last_header_req: Mutex::new(Instant::now() - Duration::from_secs(2)), @@ -79,7 +79,7 @@ impl Syncer { // as a peer with higher difficulty exists and we're not fully caught up info!("Starting sync loop."); loop { - let tip = self.chain_store.get_header_head()?; + let tip = self.chain.get_header_head()?; // TODO do something better (like trying to get more) if we lose peers let peer = self.p2p.most_work_peer().unwrap(); @@ -117,15 +117,15 @@ impl Syncer { /// blocks fn init_download(&self) -> Result<(), Error> { // compare the header's head to the full one to see what we're missing - let header_head = self.chain_store.get_header_head()?; - let full_head = self.chain_store.head()?; + let header_head = self.chain.get_header_head()?; + let full_head = self.chain.head()?; let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); // go back the chain and insert for download all blocks we only have the // head for let mut prev_h = header_head.last_block_h; while prev_h != full_head.last_block_h { - let header = self.chain_store.get_block_header(&prev_h)?; + let header = self.chain.get_block_header(&prev_h)?; blocks_to_download.push(header.hash()); prev_h = header.previous; } @@ -174,7 +174,7 @@ impl Syncer { *last_header_req = Instant::now(); } - let tip = self.chain_store.get_header_head()?; + let tip = self.chain.get_header_head()?; let peer = self.p2p.most_work_peer(); let locator = self.get_locator(&tip)?; if let Some(p) = peer { @@ -221,7 +221,7 @@ impl Syncer { // Iteratively travel the header chain back from our head and retain the // headers at the wanted heights. - let mut header = self.chain_store.get_block_header(&tip.last_block_h)?; + let mut header = self.chain.get_block_header(&tip.last_block_h)?; let mut locator = vec![]; while heights.len() > 0 { if header.height == heights[0] { @@ -229,7 +229,7 @@ impl Syncer { locator.push(header.hash()); } if header.height > 0 { - header = self.chain_store.get_block_header(&header.previous)?; + header = self.chain.get_block_header(&header.previous)?; } } Ok(locator) diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 562835373..dce0a0ff5 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -52,10 +52,10 @@ impl Peer { .and_then(|(conn, proto, info)| { Ok((conn, Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + info: info, + proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), + })) }); Box::new(connect_peer) } @@ -70,10 +70,10 @@ impl Peer { .and_then(|(conn, proto, info)| { Ok((conn, Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + info: info, + proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), + })) }); Box::new(hs_peer) } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 59a7b03f0..57a66af4f 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -32,7 +32,7 @@ enum_from_primitive! { pub enum State { Healthy, Banned, - Dead, + Defunct, } } diff --git a/src/bin/grin.rs b/src/bin/grin.rs index b67747299..8dcf99425 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -254,9 +254,8 @@ fn wallet_command(wallet_args: &ArgMatches) { wallet_config.check_node_api_http_addr = sa.to_string().clone(); } - match wallet_args.subcommand() { - + ("receive", Some(receive_args)) => { if let Some(f) = receive_args.value_of("input") { let mut file = File::open(f).expect("Unable to open transaction file."); @@ -264,12 +263,14 @@ fn wallet_command(wallet_args: &ArgMatches) { file.read_to_string(&mut contents).expect("Unable to read transaction file."); wallet::receive_json_tx(&wallet_config, &key, contents.as_str()).unwrap(); } else { - info!("Starting the Grin wallet receiving daemon at {}...", wallet_config.api_http_addr); + info!("Starting the Grin wallet receiving daemon at {}...", + wallet_config.api_http_addr); let mut apis = api::ApiServer::new("/v1".to_string()); - apis.register_endpoint("/receive".to_string(), wallet::WalletReceiver { - key: key, - config: wallet_config - }); + apis.register_endpoint("/receive".to_string(), + wallet::WalletReceiver { + key: key, + config: wallet_config, + }); apis.start(addr).unwrap_or_else(|e| { error!("Failed to start Grin wallet receiver: {}.", e); }); @@ -304,7 +305,7 @@ fn read_config() -> grin::ServerConfig { fn default_config() -> grin::ServerConfig { grin::ServerConfig { - test_mode: true, + test_mode: true, seeding_type: grin::Seeding::WebStatic, ..Default::default() } diff --git a/util/src/lib.rs b/util/src/lib.rs index 5256ee20f..9287f4bd3 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -26,6 +26,7 @@ pub use hex::*; // construction. This implementation will purposefully fail hard if not used // properly, for example if it's not initialized before being first used // (borrowed). +#[derive(Clone)] pub struct OneTime { inner: RefCell>, }