diff --git a/api/src/endpoints.rs b/api/src/endpoints.rs index 170fb6ce4..f813d67af 100644 --- a/api/src/endpoints.rs +++ b/api/src/endpoints.rs @@ -38,7 +38,7 @@ use util; #[derive(Clone)] pub struct ChainApi { /// data store access - chain_store: Arc, + chain: Arc, } impl ApiEndpoint for ChainApi { @@ -52,7 +52,7 @@ impl ApiEndpoint for ChainApi { } fn get(&self, id: String) -> ApiResult { - self.chain_store.head().map_err(|e| Error::Internal(e.to_string())) + self.chain.head().map_err(|e| Error::Internal(format!("{:?}", e))) } } @@ -60,8 +60,7 @@ impl ApiEndpoint for ChainApi { #[derive(Clone)] pub struct OutputApi { /// data store access - chain_store: Arc, - chain_head: Arc>, + chain: Arc, } impl ApiEndpoint for OutputApi { @@ -77,34 +76,12 @@ impl ApiEndpoint for OutputApi { fn get(&self, id: String) -> ApiResult { debug!("GET output {}", id); let c = util::from_hex(id.clone()).map_err(|e| Error::Argument(format!("Not a valid commitment: {}", id)))?; - let commitment = Commitment::from_vec(c); - // TODO use an actual UTXO tree - // in the meantime doing it the *very* expensive way: - // 1. check the output exists - // 2. run the chain back from the head to check it hasn't been spent - if let Ok(out) = self.chain_store.get_output_by_commit(&commitment) { - let mut block_h: Hash; - { - let chain_head = self.chain_head.clone(); - let head = chain_head.lock().unwrap(); - block_h = head.last_block_h; - } - loop { - let b = self.chain_store.get_block(&block_h)?; - for input in b.inputs { - if input.commitment() == commitment { - return Err(Error::NotFound); - } - } - if b.header.height == 1 { - return Ok(out); - } else { - block_h = b.header.previous; - } - } - } - Err(Error::NotFound) + match self.chain.get_unspent(&Commitment::from_vec(c)) { + Some(utxo) => Ok(utxo), + None => Err(Error::NotFound), + } + } } @@ -176,8 +153,7 @@ struct TxWrapper { /// Start all server REST APIs. Just register all of them on a ApiServer /// instance and runs the corresponding HTTP server. pub fn start_rest_apis(addr: String, - chain_store: Arc, - chain_head: Arc>, + chain: Arc, tx_pool: Arc>>) where T: pool::BlockChain + Clone + Send + Sync + 'static { @@ -185,11 +161,10 @@ pub fn start_rest_apis(addr: String, thread::spawn(move || { let mut apis = ApiServer::new("/v1".to_string()); apis.register_endpoint("/chain".to_string(), - ChainApi { chain_store: chain_store.clone() }); + ChainApi { chain: chain.clone() }); apis.register_endpoint("/chain/utxo".to_string(), OutputApi { - chain_store: chain_store.clone(), - chain_head: chain_head.clone(), + chain: chain.clone(), }); apis.register_endpoint("/pool".to_string(), PoolApi { tx_pool: tx_pool }); diff --git a/chain/src/chain.rs b/chain/src/chain.rs new file mode 100644 index 000000000..54a17bebb --- /dev/null +++ b/chain/src/chain.rs @@ -0,0 +1,224 @@ +// Copyright 2016 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Facade and handler for the rest of the blockchain implementation +//! and mostly the chain pipeline. + +use std::sync::{Arc, Mutex}; + +use secp::pedersen::Commitment; + +use core::core::{Block, BlockHeader, Output}; +use core::core::target::Difficulty; +use core::core::hash::Hash; +use core::{consensus, genesis, pow}; +use grin_store; +use pipe; +use store; +use types::*; + +/// Helper macro to transform a Result into an Option with None in case +/// of error +macro_rules! none_err { + ($trying:expr) => {{ + let tried = $trying; + if let Err(_) = tried { + return None; + } + tried.unwrap() + }} +} + +/// Facade to the blockchain block processing pipeline and storage. Provides +/// the current view of the UTXO set according to the chain state. Also +/// maintains locking for the pipeline to avoid conflicting processing. +pub struct Chain { + store: Arc, + adapter: Arc, + head: Arc>, + block_process_lock: Arc>, + test_mode: bool, +} + +unsafe impl Sync for Chain {} +unsafe impl Send for Chain {} + +impl Chain { + /// Initializes the blockchain and returns a new Chain instance. Does a + /// check + /// on the current chain head to make sure it exists and creates one based + /// on + /// the genesis block if necessary. + pub fn init(test_mode: bool, + db_root: String, + adapter: Arc) + -> Result { + let chain_store = store::ChainKVStore::new(db_root)?; + + // check if we have a head in store, otherwise the genesis block is it + let head = match chain_store.head() { + Ok(tip) => tip, + Err(grin_store::Error::NotFoundErr) => { + info!("No genesis block found, creating and saving one."); + let mut gen = genesis::genesis(); + let diff = gen.header.difficulty.clone(); + let sz = if test_mode { + consensus::TEST_SIZESHIFT + } else { + consensus::DEFAULT_SIZESHIFT + }; + pow::pow_size(&mut gen.header, diff, sz as u32).unwrap(); + chain_store.save_block(&gen)?; + + // saving a new tip based on genesis + let tip = Tip::new(gen.hash()); + chain_store.save_head(&tip)?; + info!("Saved genesis block with hash {}", gen.hash()); + tip + } + Err(e) => return Err(Error::StoreErr(e)), + }; + + let head = chain_store.head()?; + + Ok(Chain { + store: Arc::new(chain_store), + adapter: adapter, + head: Arc::new(Mutex::new(head)), + block_process_lock: Arc::new(Mutex::new(true)), + test_mode: test_mode, + }) + } + + /// Attempt to add a new block to the chain. Returns the new chain tip if it + /// has been added to the longest chain, None if it's added to an (as of + /// now) + /// orphan chain. + pub fn process_block(&self, b: &Block, opts: Options) -> Result, Error> { + + let head = self.store.head().map_err(&Error::StoreErr)?; + let ctx = self.ctx_from_head(head, opts); + + let res = pipe::process_block(b, ctx); + + if let Ok(Some(ref tip)) = res { + let chain_head = self.head.clone(); + let mut head = chain_head.lock().unwrap(); + *head = tip.clone(); + } + + res + } + + /// Attempt to add a new header to the header chain. Only necessary during + /// sync. + pub fn process_block_header(&self, + bh: &BlockHeader, + opts: Options) + -> Result, Error> { + + let head = self.store.get_header_head().map_err(&Error::StoreErr)?; + let ctx = self.ctx_from_head(head, opts); + + pipe::process_block_header(bh, ctx) + } + + fn ctx_from_head(&self, head: Tip, opts: Options) -> pipe::BlockContext { + let mut opts_in = opts; + if self.test_mode { + opts_in = opts_in | EASY_POW; + } + pipe::BlockContext { + opts: opts_in, + store: self.store.clone(), + adapter: self.adapter.clone(), + head: head, + lock: self.block_process_lock.clone(), + } + } + + /// Gets an unspent output from its commitment. With return None if the + /// output + /// doesn't exist or has been spent. This querying is done in a way that's + /// constistent with the current chain state and more specifically the + /// current + /// branch it is on in case of forks. + pub fn get_unspent(&self, output_ref: &Commitment) -> Option { + // TODO use an actual UTXO tree + // in the meantime doing it the *very* expensive way: + // 1. check the output exists + // 2. run the chain back from the head to check it hasn't been spent + if let Ok(out) = self.store.get_output_by_commit(output_ref) { + let head = none_err!(self.store.head()); + let mut block_h = head.last_block_h; + loop { + let b = none_err!(self.store.get_block(&block_h)); + for input in b.inputs { + if input.commitment() == *output_ref { + return None; + } + } + if b.header.height == 1 { + return Some(out); + } else { + block_h = b.header.previous; + } + } + } + None + } + + /// Total difficulty at the head of the chain + pub fn total_difficulty(&self) -> Difficulty { + self.head.lock().unwrap().clone().total_difficulty + } + + /// Get the tip that's also the head of the chain + pub fn head(&self) -> Result { + Ok(self.head.lock().unwrap().clone()) + } + + /// Block header for the chain head + pub fn head_header(&self) -> Result { + self.store.head_header().map_err(&Error::StoreErr) + } + + /// Gets a block header by hash + pub fn get_block(&self, h: &Hash) -> Result { + self.store.get_block(h).map_err(&Error::StoreErr) + } + + /// Gets a block header by hash + pub fn get_block_header(&self, h: &Hash) -> Result { + self.store.get_block_header(h).map_err(&Error::StoreErr) + } + + /// Gets the block header at the provided height + pub fn get_header_by_height(&self, height: u64) -> Result { + self.store.get_header_by_height(height).map_err(&Error::StoreErr) + } + + /// Get the tip of the header chain + pub fn get_header_head(&self) -> Result { + self.store.get_header_head().map_err(&Error::StoreErr) + } + + /// Builds an iterator on blocks starting from the current chain head and + /// running backward. Specialized to return information pertaining to block + /// difficulty calculation (timestamp and previous difficulties). + pub fn difficulty_iter(&self) -> store::DifficultyIter { + let head = self.head.lock().unwrap(); + store::DifficultyIter::from(head.last_block_h, self.store.clone()) + } +} diff --git a/chain/src/lib.rs b/chain/src/lib.rs index cff9ef779..fe6f1cdc9 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -34,11 +34,12 @@ extern crate grin_core as core; extern crate grin_store; extern crate secp256k1zkp as secp; +mod chain; pub mod pipe; pub mod store; pub mod types; // Re-export the base interface -pub use types::{ChainStore, Tip, ChainAdapter}; -pub use pipe::{SYNC, NONE, EASY_POW, process_block, process_block_header, Options, Error}; +pub use chain::Chain; +pub use types::{ChainStore, Tip, ChainAdapter, SYNC, NONE, SKIP_POW, EASY_POW, Options, Error}; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index df391b05f..1ec2d0ab6 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -26,130 +26,57 @@ use core::core::target::Difficulty; use core::core::{BlockHeader, Block, Proof}; use core::pow; use core::ser; -use grin_store; -use types; -use types::{Tip, ChainStore, ChainAdapter, NoopAdapter}; +use types::*; use store; -bitflags! { - /// Options for block validation - pub flags Options: u32 { - const NONE = 0b00000001, - /// Runs without checking the Proof of Work, mostly to make testing easier. - const SKIP_POW = 0b00000010, - /// Runs PoW verification with a lower cycle size. - const EASY_POW = 0b00000100, - /// Adds block while in syncing mode. - const SYNC = 0b00001000, - } -} - /// Contextual information required to process a new block and either reject or /// accept it. pub struct BlockContext { - opts: Options, - store: Arc, - adapter: Arc, - head: Tip, -} - -#[derive(Debug)] -pub enum Error { - /// The block doesn't fit anywhere in our chain - Unfit(String), - /// Difficulty is too low either compared to ours or the block PoW hash - DifficultyTooLow, - /// Addition of difficulties on all previous block is wrong - WrongTotalDifficulty, - /// Size of the Cuckoo graph in block header doesn't match PoW requirements - WrongCuckooSize, - /// The proof of work is invalid - InvalidPow, - /// The block doesn't sum correctly or a tx signature is invalid - InvalidBlockProof(secp::Error), - /// Block time is too old - InvalidBlockTime, - /// Block height is invalid (not previous + 1) - InvalidBlockHeight, - /// Internal issue when trying to save or load data from store - StoreErr(grin_store::Error), - SerErr(ser::Error), - Other(String), -} - -impl From for Error { - fn from(e: grin_store::Error) -> Error { - Error::StoreErr(e) - } -} -impl From for Error { - fn from(e: ser::Error) -> Error { - Error::SerErr(e) - } + pub opts: Options, + pub store: Arc, + pub adapter: Arc, + pub head: Tip, + pub lock: Arc>, } /// Runs the block processing pipeline, including validation and finding a /// place for the new block in the chain. Returns the new /// chain head if updated. -pub fn process_block(b: &Block, - store: Arc, - adapter: Arc, - opts: Options) - -> Result, Error> { +pub fn process_block(b: &Block, mut ctx: BlockContext) -> Result, Error> { // TODO should just take a promise for a block with a full header so we don't // spend resources reading the full block when its header is invalid - let head = store.head().map_err(&Error::StoreErr)?; - - let mut ctx = BlockContext { - opts: opts, - store: store, - adapter: adapter, - head: head, - }; - info!("Starting validation pipeline for block {} at {} with {} inputs and {} outputs.", b.hash(), b.header.height, b.inputs.len(), b.outputs.len()); - try!(check_known(b.hash(), &mut ctx)); + check_known(b.hash(), &mut ctx)?; if !ctx.opts.intersects(SYNC) { // in sync mode, the header has already been validated - try!(validate_header(&b.header, &mut ctx)); + validate_header(&b.header, &mut ctx)?; } - try!(validate_block(b, &mut ctx)); + validate_block(b, &mut ctx)?; debug!("Block at {} with hash {} is valid, going to save and append.", b.header.height, b.hash()); - try!(add_block(b, &mut ctx)); - // TODO a global lock should be set before that step or even earlier + + ctx.lock.lock(); + add_block(b, &mut ctx)?; update_head(b, &mut ctx) } -pub fn process_block_header(bh: &BlockHeader, - store: Arc, - adapter: Arc, - opts: Options) - -> Result, Error> { - - let head = store.get_header_head().map_err(&Error::StoreErr)?; - - let mut ctx = BlockContext { - opts: opts, - store: store, - adapter: adapter, - head: head, - }; +pub fn process_block_header(bh: &BlockHeader, mut ctx: BlockContext) -> Result, Error> { info!("Starting validation pipeline for block header {} at {}.", bh.hash(), bh.height); - try!(check_known(bh.hash(), &mut ctx)); - try!(validate_header(&bh, &mut ctx)); - try!(add_block_header(bh, &mut ctx)); - // TODO a global lock should be set before that step or even earlier + check_known(bh.hash(), &mut ctx)?; + validate_header(&bh, &mut ctx)?; + add_block_header(bh, &mut ctx)?; + + ctx.lock.lock(); update_header_head(bh, &mut ctx) } diff --git a/chain/src/store.rs b/chain/src/store.rs index 281d8c5da..2a1800425 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -156,8 +156,8 @@ pub struct DifficultyIter { } impl DifficultyIter { - /// Build a new iterator using the provided chain store and starting from - /// the provided block hash. + /// Build a new iterator using the provided chain store and starting from + /// the provided block hash. pub fn from(start: Hash, store: Arc) -> DifficultyIter { DifficultyIter { next: start, diff --git a/chain/src/types.rs b/chain/src/types.rs index 306b3f27a..3daf6af1e 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -14,13 +14,63 @@ //! Base types that the block chain pipeline requires. +use secp; use secp::pedersen::Commitment; -use grin_store::Error; +use grin_store as store; use core::core::{Block, BlockHeader, Output}; use core::core::hash::{Hash, Hashed}; use core::core::target::Difficulty; use core::ser; +use grin_store; + +bitflags! { + /// Options for block validation + pub flags Options: u32 { + const NONE = 0b00000001, + /// Runs without checking the Proof of Work, mostly to make testing easier. + const SKIP_POW = 0b00000010, + /// Runs PoW verification with a lower cycle size. + const EASY_POW = 0b00000100, + /// Adds block while in syncing mode. + const SYNC = 0b00001000, + } +} + +#[derive(Debug)] +pub enum Error { + /// The block doesn't fit anywhere in our chain + Unfit(String), + /// Difficulty is too low either compared to ours or the block PoW hash + DifficultyTooLow, + /// Addition of difficulties on all previous block is wrong + WrongTotalDifficulty, + /// Size of the Cuckoo graph in block header doesn't match PoW requirements + WrongCuckooSize, + /// The proof of work is invalid + InvalidPow, + /// The block doesn't sum correctly or a tx signature is invalid + InvalidBlockProof(secp::Error), + /// Block time is too old + InvalidBlockTime, + /// Block height is invalid (not previous + 1) + InvalidBlockHeight, + /// Internal issue when trying to save or load data from store + StoreErr(grin_store::Error), + SerErr(ser::Error), + Other(String), +} + +impl From for Error { + fn from(e: grin_store::Error) -> Error { + Error::StoreErr(e) + } +} +impl From for Error { + fn from(e: ser::Error) -> Error { + Error::SerErr(e) + } +} /// The tip of a fork. A handle to the fork ancestry from its leaf in the /// blockchain tree. References the max height and the latest and previous @@ -89,53 +139,53 @@ impl ser::Readable for Tip { /// blocks. pub trait ChainStore: Send + Sync { /// Get the tip that's also the head of the chain - fn head(&self) -> Result; + fn head(&self) -> Result; /// Block header for the chain head - fn head_header(&self) -> Result; + fn head_header(&self) -> Result; /// Save the provided tip as the current head of our chain - fn save_head(&self, t: &Tip) -> Result<(), Error>; + fn save_head(&self, t: &Tip) -> Result<(), store::Error>; /// Save the provided tip as the current head of the body chain, leaving the /// header chain alone. - fn save_body_head(&self, t: &Tip) -> Result<(), Error>; + fn save_body_head(&self, t: &Tip) -> Result<(), store::Error>; /// Gets a block header by hash - fn get_block(&self, h: &Hash) -> Result; + fn get_block(&self, h: &Hash) -> Result; /// Gets a block header by hash - fn get_block_header(&self, h: &Hash) -> Result; + fn get_block_header(&self, h: &Hash) -> Result; /// Checks whether a block has been been processed and saved - fn check_block_exists(&self, h: &Hash) -> Result; + fn check_block_exists(&self, h: &Hash) -> Result; /// Save the provided block in store - fn save_block(&self, b: &Block) -> Result<(), Error>; + fn save_block(&self, b: &Block) -> Result<(), store::Error>; /// Save the provided block header in store - fn save_block_header(&self, bh: &BlockHeader) -> Result<(), Error>; + fn save_block_header(&self, bh: &BlockHeader) -> Result<(), store::Error>; /// Get the tip of the header chain - fn get_header_head(&self) -> Result; + fn get_header_head(&self) -> Result; /// Save the provided tip as the current head of the block header chain - fn save_header_head(&self, t: &Tip) -> Result<(), Error>; + fn save_header_head(&self, t: &Tip) -> Result<(), store::Error>; /// Gets the block header at the provided height - fn get_header_by_height(&self, height: u64) -> Result; + fn get_header_by_height(&self, height: u64) -> Result; /// Gets an output by its commitment - fn get_output_by_commit(&self, commit: &Commitment) -> Result; + fn get_output_by_commit(&self, commit: &Commitment) -> Result; /// Checks whether an output commitment exists and returns the output hash - fn has_output_commit(&self, commit: &Commitment) -> Result; + fn has_output_commit(&self, commit: &Commitment) -> Result; /// Saves the provided block header at the corresponding height. Also check /// the consistency of the height chain in store by assuring previous /// headers /// are also at their respective heights. - fn setup_height(&self, bh: &BlockHeader) -> Result<(), Error>; + fn setup_height(&self, bh: &BlockHeader) -> Result<(), store::Error>; } /// Bridge between the chain pipeline and the rest of the system. Handles diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 9e7b1fb4d..2062f21dd 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -35,46 +35,27 @@ use grin_core::consensus; fn mine_empty_chain() { env_logger::init(); let mut rng = OsRng::new().unwrap(); - let store = grin_chain::store::ChainKVStore::new(".grin".to_string()).unwrap(); - - // save a genesis block - let mut gen = grin_core::genesis::genesis(); - let diff = gen.header.difficulty.clone(); - pow::pow_size(&mut gen.header, diff, consensus::TEST_SIZESHIFT as u32).unwrap(); - store.save_block(&gen).unwrap(); - - // setup a new head tip - let tip = Tip::new(gen.hash()); - store.save_head(&tip).unwrap(); + let chain = grin_chain::Chain::init(true, ".grin".to_string(), Arc::new(NoopAdapter{})).unwrap(); // mine and add a few blocks - let mut prev = gen; let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let reward_key = secp::key::SecretKey::new(&secp, &mut rng); - let arc_store = Arc::new(store); - let adapter = Arc::new(NoopAdapter {}); for n in 1..4 { - let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); - b.header.timestamp = prev.header.timestamp + time::Duration::seconds(60); + let prev = chain.head_header().unwrap(); + let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); - let diff_iter = store::DifficultyIter::from(b.header.previous, arc_store.clone()); - let difficulty = consensus::next_difficulty(diff_iter).unwrap(); + let difficulty = consensus::next_difficulty(chain.difficulty_iter()).unwrap(); b.header.difficulty = difficulty.clone(); pow::pow_size(&mut b.header, difficulty, consensus::TEST_SIZESHIFT as u32).unwrap(); - grin_chain::pipe::process_block(&b, - arc_store.clone(), - adapter.clone(), - grin_chain::pipe::EASY_POW) - .unwrap(); + chain.process_block(&b, grin_chain::EASY_POW).unwrap(); // checking our new head - let head = arc_store.clone().head().unwrap(); + let head = chain.head().unwrap(); assert_eq!(head.height, n); assert_eq!(head.last_block_h, b.hash()); - - prev = b; } } @@ -82,58 +63,37 @@ fn mine_empty_chain() { fn mine_forks() { env_logger::init(); let mut rng = OsRng::new().unwrap(); - let store = grin_chain::store::ChainKVStore::new(".grin2".to_string()).unwrap(); - - // save a genesis block - let mut gen = grin_core::genesis::genesis(); - let diff = gen.header.difficulty.clone(); - pow::pow_size(&mut gen.header, diff, consensus::TEST_SIZESHIFT as u32).unwrap(); - store.save_block(&gen).unwrap(); - - // setup a new head tip - let tip = Tip::new(gen.hash()); - store.save_head(&tip).unwrap(); + let chain = grin_chain::Chain::init(true, ".grin2".to_string(), Arc::new(NoopAdapter{})).unwrap(); // mine and add a few blocks - let mut prev = gen; let secp = secp::Secp256k1::with_caps(secp::ContextFlag::Commit); let reward_key = secp::key::SecretKey::new(&secp, &mut rng); - let arc_store = Arc::new(store); - let adapter = Arc::new(NoopAdapter {}); for n in 1..4 { - let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); - b.header.timestamp = prev.header.timestamp + time::Duration::seconds(60); + let prev = chain.head_header().unwrap(); + let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); b.header.total_difficulty = Difficulty::from_num(2*n); - grin_chain::pipe::process_block(&b, - arc_store.clone(), - adapter.clone(), - grin_chain::pipe::SKIP_POW) - .unwrap(); + chain.process_block(&b, grin_chain::SKIP_POW).unwrap(); // checking our new head thread::sleep(::std::time::Duration::from_millis(50)); - let head = arc_store.clone().head().unwrap(); + let head = chain.head().unwrap(); assert_eq!(head.height, n as u64); assert_eq!(head.last_block_h, b.hash()); assert_eq!(head.prev_block_h, prev.hash()); - let mut b = core::Block::new(&prev.header, vec![], reward_key).unwrap(); - b.header.timestamp = prev.header.timestamp + time::Duration::seconds(60); + // build another block with higher difficulty + let mut b = core::Block::new(&prev, vec![], reward_key).unwrap(); + b.header.timestamp = prev.timestamp + time::Duration::seconds(60); b.header.total_difficulty = Difficulty::from_num(2*n+1); - grin_chain::pipe::process_block(&b, - arc_store.clone(), - adapter.clone(), - grin_chain::pipe::SKIP_POW) - .unwrap(); + chain.process_block(&b, grin_chain::SKIP_POW).unwrap(); - // checking our new head + // checking head switch thread::sleep(::std::time::Duration::from_millis(50)); - let head = arc_store.clone().head().unwrap(); + let head = chain.head().unwrap(); assert_eq!(head.height, n as u64); assert_eq!(head.last_block_h, b.hash()); assert_eq!(head.prev_block_h, prev.hash()); - - prev = b; } } diff --git a/grin/src/adapters.rs b/grin/src/adapters.rs index 9485e8090..7869f9193 100644 --- a/grin/src/adapters.rs +++ b/grin/src/adapters.rs @@ -33,10 +33,7 @@ use sync; /// implementations. pub struct NetToChainAdapter { test_mode: bool, - /// the reference copy of the current chain state - chain_head: Arc>, - chain_store: Arc, - chain_adapter: Arc, + chain: Arc, peer_store: Arc, tx_pool: Arc>>, @@ -45,7 +42,7 @@ pub struct NetToChainAdapter { impl NetAdapter for NetToChainAdapter { fn total_difficulty(&self) -> Difficulty { - self.chain_head.lock().unwrap().clone().total_difficulty + self.chain.total_difficulty() } fn transaction_received(&self, tx: core::Transaction) { @@ -63,17 +60,10 @@ impl NetAdapter for NetToChainAdapter { b.hash()); // pushing the new block through the chain pipeline - let store = self.chain_store.clone(); - let chain_adapter = self.chain_adapter.clone(); - let res = chain::process_block(&b, store, chain_adapter, self.chain_opts()); + let res = self.chain.process_block(&b, self.chain_opts()); - // log errors and update the shared head reference on success if let Err(e) = res { debug!("Block {} refused by chain: {:?}", b.hash(), e); - } else if let Ok(Some(tip)) = res { - let chain_head = self.chain_head.clone(); - let mut head = chain_head.lock().unwrap(); - *head = tip; } if self.syncer.borrow().syncing() { @@ -85,10 +75,7 @@ impl NetAdapter for NetToChainAdapter { // try to add each header to our header chain let mut added_hs = vec![]; for bh in bhs { - let store = self.chain_store.clone(); - let chain_adapter = self.chain_adapter.clone(); - - let res = chain::process_block_header(&bh, store, chain_adapter, self.chain_opts()); + let res = self.chain.process_block_header(&bh, self.chain_opts()); match res { Ok(_) => { added_hs.push(bh.hash()); @@ -122,10 +109,10 @@ impl NetAdapter for NetToChainAdapter { } // go through the locator vector and check if we know any of these headers - let known = self.chain_store.get_block_header(&locator[0]); + let known = self.chain.get_block_header(&locator[0]); let header = match known { Ok(header) => header, - Err(store::Error::NotFoundErr) => { + Err(chain::Error::StoreErr(store::Error::NotFoundErr)) => { return self.locate_headers(locator[1..].to_vec()); } Err(e) => { @@ -138,10 +125,10 @@ impl NetAdapter for NetToChainAdapter { let hh = header.height; let mut headers = vec![]; for h in (hh + 1)..(hh + (p2p::MAX_BLOCK_HEADERS as u64)) { - let header = self.chain_store.get_header_by_height(h); + let header = self.chain.get_header_by_height(h); match header { Ok(head) => headers.push(head), - Err(store::Error::NotFoundErr) => break, + Err(chain::Error::StoreErr(store::Error::NotFoundErr)) => break, Err(e) => { error!("Could not build header locator: {:?}", e); return vec![]; @@ -153,8 +140,7 @@ impl NetAdapter for NetToChainAdapter { /// Gets a full block by its hash. fn get_block(&self, h: Hash) -> Option { - let store = self.chain_store.clone(); - let b = store.get_block(&h); + let b = self.chain.get_block(&h); match b { Ok(b) => Some(b), _ => None, @@ -207,17 +193,13 @@ impl NetAdapter for NetToChainAdapter { impl NetToChainAdapter { pub fn new(test_mode: bool, - chain_head: Arc>, - chain_store: Arc, - chain_adapter: Arc, + chain_ref: Arc, tx_pool: Arc>>, peer_store: Arc) -> NetToChainAdapter { NetToChainAdapter { test_mode: test_mode, - chain_head: chain_head, - chain_store: chain_store, - chain_adapter: chain_adapter, + chain: chain_ref, peer_store: peer_store, tx_pool: tx_pool, syncer: OneTime::new(), @@ -283,63 +265,26 @@ impl ChainToPoolAndNetAdapter { } /// Implements the view of the blockchain required by the TransactionPool to -/// operate. This is mostly getting information on unspent outputs in a -/// manner consistent with the chain state. +/// operate. Mostly needed to break any direct lifecycle or implementation +/// dependency between the pool and the chain. #[derive(Clone)] pub struct PoolToChainAdapter { - chain_head: Arc>, - chain_store: Arc, -} - -macro_rules! none_err { - ($trying:expr) => {{ - let tried = $trying; - if let Err(_) = tried { - return None; - } - tried.unwrap() - }} + chain: OneTime>, } impl PoolToChainAdapter { /// Create a new pool adapter - pub fn new(chain_head: Arc>, - chain_store: Arc) - -> PoolToChainAdapter { - PoolToChainAdapter { - chain_head: chain_head, - chain_store: chain_store, - } + pub fn new() -> PoolToChainAdapter { + PoolToChainAdapter { chain: OneTime::new() } + } + + pub fn set_chain(&self, chain_ref: Arc) { + self.chain.init(chain_ref); } } impl pool::BlockChain for PoolToChainAdapter { fn get_unspent(&self, output_ref: &Commitment) -> Option { - // TODO use an actual UTXO tree - // in the meantime doing it the *very* expensive way: - // 1. check the output exists - // 2. run the chain back from the head to check it hasn't been spent - if let Ok(out) = self.chain_store.get_output_by_commit(output_ref) { - let mut block_h: Hash; - { - let chain_head = self.chain_head.clone(); - let head = chain_head.lock().unwrap(); - block_h = head.last_block_h; - } - loop { - let b = none_err!(self.chain_store.get_block(&block_h)); - for input in b.inputs { - if input.commitment() == *output_ref { - return None; - } - } - if b.header.height == 1 { - return Some(out); - } else { - block_h = b.header.previous; - } - } - } - None + self.chain.borrow().get_unspent(output_ref) } } diff --git a/grin/src/miner.rs b/grin/src/miner.rs index 5c9e09985..cf3ebafea 100644 --- a/grin/src/miner.rs +++ b/grin/src/miner.rs @@ -26,7 +26,7 @@ use api; use core::consensus; use core::consensus::*; use core::core; -use core::core::target::*; +use core::core::target::Difficulty; use core::core::hash::{Hash, Hashed}; use core::pow::cuckoo; use core::ser; @@ -42,10 +42,7 @@ const MAX_TX: u32 = 5000; pub struct Miner { config: MinerConfig, - chain_head: Arc>, - chain_store: Arc, - /// chain adapter to net - chain_adapter: Arc, + chain: Arc, tx_pool: Arc>>, //Just to hold the port we're on, so this miner can be identified @@ -57,16 +54,12 @@ impl Miner { /// Creates a new Miner. Needs references to the chain state and its /// storage. pub fn new(config: MinerConfig, - chain_head: Arc>, - chain_store: Arc, - chain_adapter: Arc, + chain_ref: Arc, tx_pool: Arc>>) -> Miner { Miner { config: config, - chain_head: chain_head, - chain_store: chain_store, - chain_adapter: chain_adapter, + chain: chain_ref, tx_pool: tx_pool, debug_output_id: String::from("none"), } @@ -88,12 +81,8 @@ impl Miner { let mut coinbase = self.get_coinbase(); loop { // get the latest chain state and build a block on top of it - let head: core::BlockHeader; - let mut latest_hash: Hash; - { - head = self.chain_store.head_header().unwrap(); - latest_hash = self.chain_head.lock().unwrap().last_block_h; - } + let head = self.chain.head_header().unwrap(); + let mut latest_hash = self.chain.head().unwrap().last_block_h; let mut b = self.build_block(&head, coinbase.clone()); // look for a pow for at most 2 sec on the same block (to give a chance to new @@ -128,9 +117,7 @@ impl Miner { } } b.header.nonce += 1; - { - latest_hash = self.chain_head.lock().unwrap().last_block_h; - } + latest_hash = self.chain.head().unwrap().last_block_h; iter_count += 1; //Artificial slow down @@ -149,18 +136,12 @@ impl Miner { } else { chain::NONE }; - let res = chain::process_block(&b, - self.chain_store.clone(), - self.chain_adapter.clone(), - opts); + let res = self.chain.process_block(&b, opts); if let Err(e) = res { error!("(Server ID: {}) Error validating mined block: {:?}", self.debug_output_id, e); - } else if let Ok(Some(tip)) = res { - let chain_head = self.chain_head.clone(); - let mut head = chain_head.lock().unwrap(); + } else { coinbase = self.get_coinbase(); - *head = tip; } } else { debug!("(Server ID: {}) No solution found after {} iterations, continuing...", @@ -182,7 +163,7 @@ impl Miner { now_sec += 1; } - let diff_iter = chain::store::DifficultyIter::from(head.hash(), self.chain_store.clone()); + let diff_iter = self.chain.difficulty_iter(); let difficulty = consensus::next_difficulty(diff_iter).unwrap(); let txs_box = self.tx_pool.read().unwrap().prepare_mineable_transactions(MAX_TX); diff --git a/grin/src/seed.rs b/grin/src/seed.rs index dbadebe9a..ca9ad4345 100644 --- a/grin/src/seed.rs +++ b/grin/src/seed.rs @@ -166,13 +166,18 @@ impl Seeder { rx: mpsc::UnboundedReceiver) -> Box> { let capab = self.capabilities; + let p2p_store = self.peer_store.clone(); let p2p_server = self.p2p.clone(); let listener = rx.for_each(move |peer_addr| { debug!("New peer address to connect to: {}.", peer_addr); let inner_h = h.clone(); if p2p_server.peer_count() < PEER_MAX_COUNT { - connect_and_req(capab, p2p_server.clone(), inner_h, peer_addr) + connect_and_req(capab, + p2p_store.clone(), + p2p_server.clone(), + inner_h, + peer_addr) } else { Box::new(future::ok(())) } @@ -222,6 +227,7 @@ pub fn predefined_seeds(addrs_str: Vec) } fn connect_and_req(capab: p2p::Capabilities, + peer_store: Arc, p2p: Arc, h: reactor::Handle, addr: SocketAddr) @@ -234,6 +240,7 @@ fn connect_and_req(capab: p2p::Capabilities, } Err(e) => { error!("Peer request error: {:?}", e); + peer_store.update_state(addr, p2p::State::Defunct); } _ => {} } diff --git a/grin/src/server.rs b/grin/src/server.rs index 57467ce62..4288dc8e3 100644 --- a/grin/src/server.rs +++ b/grin/src/server.rs @@ -45,13 +45,8 @@ pub struct Server { evt_handle: reactor::Handle, /// handle to our network server p2p: Arc, - /// the reference copy of the current chain state - chain_head: Arc>, /// data store access - chain_store: Arc, - /// chain adapter to net, required for miner and anything that submits - /// blocks - chain_adapter: Arc, + chain: Arc, /// in-memory transaction pool tx_pool: Arc>>, } @@ -84,27 +79,25 @@ impl Server { pub fn future(mut config: ServerConfig, evt_handle: &reactor::Handle) -> Result { check_config(&mut config); - let (chain_store, head) = try!(store_head(&config)); - let shared_head = Arc::new(Mutex::new(head)); - - let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); - - let pool_adapter = Arc::new(PoolToChainAdapter::new(shared_head.clone(), - chain_store.clone())); - let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(pool_adapter))); + let pool_adapter = Arc::new(PoolToChainAdapter::new()); + let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new(pool_adapter.clone()))); let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); + let shared_chain = Arc::new(chain::Chain::init(config.test_mode, + config.db_root.clone(), + chain_adapter.clone())?); + pool_adapter.set_chain(shared_chain.clone()); + + let peer_store = Arc::new(p2p::PeerStore::new(config.db_root.clone())?); let net_adapter = Arc::new(NetToChainAdapter::new(config.test_mode, - shared_head.clone(), - chain_store.clone(), - chain_adapter.clone(), + shared_chain.clone(), tx_pool.clone(), peer_store.clone())); - let server = + let p2p_server = Arc::new(p2p::Server::new(config.capabilities, config.p2p_config, net_adapter.clone())); - chain_adapter.init(server.clone()); + chain_adapter.init(p2p_server.clone()); - let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), server.clone()); + let seed = seed::Seeder::new(config.capabilities, peer_store.clone(), p2p_server.clone()); match config.seeding_type.clone() { Seeding::None => {} Seeding::List(seeds) => { @@ -115,26 +108,23 @@ impl Server { } } - let sync = sync::Syncer::new(chain_store.clone(), server.clone()); + let sync = sync::Syncer::new(shared_chain.clone(), p2p_server.clone()); net_adapter.start_sync(sync); - evt_handle.spawn(server.start(evt_handle.clone()).map_err(|_| ())); + evt_handle.spawn(p2p_server.start(evt_handle.clone()).map_err(|_| ())); info!("Starting rest apis at: {}", &config.api_http_addr); api::start_rest_apis(config.api_http_addr.clone(), - chain_store.clone(), - shared_head.clone(), + shared_chain.clone(), tx_pool.clone()); warn!("Grin server started."); Ok(Server { config: config, evt_handle: evt_handle.clone(), - p2p: server, - chain_head: shared_head, - chain_store: chain_store, - chain_adapter: chain_adapter, + p2p: p2p_server, + chain: shared_chain, tx_pool: tx_pool, }) } @@ -153,11 +143,7 @@ impl Server { /// Start mining for blocks on a separate thread. Relies on a toy miner, /// mostly for testing. pub fn start_miner(&self, config: MinerConfig) { - let mut miner = miner::Miner::new(config, - self.chain_head.clone(), - self.chain_store.clone(), - self.chain_adapter.clone(), - self.tx_pool.clone()); + let mut miner = miner::Miner::new(config, self.chain.clone(), self.tx_pool.clone()); miner.set_debug_output_id(format!("Port {}",self.config.p2p_config.port)); thread::spawn(move || { miner.run_loop(); @@ -165,9 +151,7 @@ impl Server { } pub fn head(&self) -> chain::Tip { - let head = self.chain_head.clone(); - let h = head.lock().unwrap(); - h.clone() + self.chain.head().unwrap() } /// Returns a set of stats about this server. This and the ServerStats structure @@ -182,44 +166,6 @@ impl Server { } } -// Helper function to create the chain storage and check if it already has a -// genesis block -fn store_head(config: &ServerConfig) - -> Result<(Arc, chain::Tip), Error> { - let chain_store = try!(chain::store::ChainKVStore::new(config.db_root.clone()) - .map_err(&Error::Store)); - - // check if we have a head in store, otherwise the genesis block is it - let head = match chain_store.head() { - Ok(tip) => tip, - Err(store::Error::NotFoundErr) => { - info!("No genesis block found, creating and saving one."); - let mut gen = core::genesis::genesis(); - let diff = gen.header.difficulty.clone(); - core::pow::pow_size(&mut gen.header, - diff, - config.mining_config.cuckoo_size as u32) - .unwrap(); - chain_store.save_block(&gen).map_err(&Error::Store)?; - let tip = chain::types::Tip::new(gen.hash()); - chain_store.save_head(&tip).map_err(&Error::Store)?; - info!("Saved genesis block with hash {}", gen.hash()); - tip - } - Err(e) => return Err(Error::Store(e)), - }; - - let head = chain_store.head()?; - let head_header = chain_store.head_header()?; - info!("Starting server with head {} at {} and header head {} at {}", - head.last_block_h, - head.height, - head_header.hash(), - head_header.height); - - Ok((Arc::new(chain_store), head)) -} - fn check_config(config: &mut ServerConfig) { // applying test/normal config config.mining_config.cuckoo_size = if config.test_mode { diff --git a/grin/src/sync.rs b/grin/src/sync.rs index 8f133a91a..39a19824f 100644 --- a/grin/src/sync.rs +++ b/grin/src/sync.rs @@ -31,7 +31,7 @@ use p2p; use types::Error; pub struct Syncer { - chain_store: Arc, + chain: Arc, p2p: Arc, sync: Mutex, @@ -41,9 +41,9 @@ pub struct Syncer { } impl Syncer { - pub fn new(chain_store: Arc, p2p: Arc) -> Syncer { + pub fn new(chain_ref: Arc, p2p: Arc) -> Syncer { Syncer { - chain_store: chain_store, + chain: chain_ref, p2p: p2p, sync: Mutex::new(true), last_header_req: Mutex::new(Instant::now() - Duration::from_secs(2)), @@ -79,7 +79,7 @@ impl Syncer { // as a peer with higher difficulty exists and we're not fully caught up info!("Starting sync loop."); loop { - let tip = self.chain_store.get_header_head()?; + let tip = self.chain.get_header_head()?; // TODO do something better (like trying to get more) if we lose peers let peer = self.p2p.most_work_peer().unwrap(); @@ -117,15 +117,15 @@ impl Syncer { /// blocks fn init_download(&self) -> Result<(), Error> { // compare the header's head to the full one to see what we're missing - let header_head = self.chain_store.get_header_head()?; - let full_head = self.chain_store.head()?; + let header_head = self.chain.get_header_head()?; + let full_head = self.chain.head()?; let mut blocks_to_download = self.blocks_to_download.lock().unwrap(); // go back the chain and insert for download all blocks we only have the // head for let mut prev_h = header_head.last_block_h; while prev_h != full_head.last_block_h { - let header = self.chain_store.get_block_header(&prev_h)?; + let header = self.chain.get_block_header(&prev_h)?; blocks_to_download.push(header.hash()); prev_h = header.previous; } @@ -174,7 +174,7 @@ impl Syncer { *last_header_req = Instant::now(); } - let tip = self.chain_store.get_header_head()?; + let tip = self.chain.get_header_head()?; let peer = self.p2p.most_work_peer(); let locator = self.get_locator(&tip)?; if let Some(p) = peer { @@ -221,7 +221,7 @@ impl Syncer { // Iteratively travel the header chain back from our head and retain the // headers at the wanted heights. - let mut header = self.chain_store.get_block_header(&tip.last_block_h)?; + let mut header = self.chain.get_block_header(&tip.last_block_h)?; let mut locator = vec![]; while heights.len() > 0 { if header.height == heights[0] { @@ -229,7 +229,7 @@ impl Syncer { locator.push(header.hash()); } if header.height > 0 { - header = self.chain_store.get_block_header(&header.previous)?; + header = self.chain.get_block_header(&header.previous)?; } } Ok(locator) diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 562835373..dce0a0ff5 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -52,10 +52,10 @@ impl Peer { .and_then(|(conn, proto, info)| { Ok((conn, Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + info: info, + proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), + })) }); Box::new(connect_peer) } @@ -70,10 +70,10 @@ impl Peer { .and_then(|(conn, proto, info)| { Ok((conn, Peer { - info: info, - proto: Box::new(proto), - state: Arc::new(RwLock::new(State::Connected)), - })) + info: info, + proto: Box::new(proto), + state: Arc::new(RwLock::new(State::Connected)), + })) }); Box::new(hs_peer) } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 59a7b03f0..57a66af4f 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -32,7 +32,7 @@ enum_from_primitive! { pub enum State { Healthy, Banned, - Dead, + Defunct, } } diff --git a/src/bin/grin.rs b/src/bin/grin.rs index b67747299..8dcf99425 100644 --- a/src/bin/grin.rs +++ b/src/bin/grin.rs @@ -254,9 +254,8 @@ fn wallet_command(wallet_args: &ArgMatches) { wallet_config.check_node_api_http_addr = sa.to_string().clone(); } - match wallet_args.subcommand() { - + ("receive", Some(receive_args)) => { if let Some(f) = receive_args.value_of("input") { let mut file = File::open(f).expect("Unable to open transaction file."); @@ -264,12 +263,14 @@ fn wallet_command(wallet_args: &ArgMatches) { file.read_to_string(&mut contents).expect("Unable to read transaction file."); wallet::receive_json_tx(&wallet_config, &key, contents.as_str()).unwrap(); } else { - info!("Starting the Grin wallet receiving daemon at {}...", wallet_config.api_http_addr); + info!("Starting the Grin wallet receiving daemon at {}...", + wallet_config.api_http_addr); let mut apis = api::ApiServer::new("/v1".to_string()); - apis.register_endpoint("/receive".to_string(), wallet::WalletReceiver { - key: key, - config: wallet_config - }); + apis.register_endpoint("/receive".to_string(), + wallet::WalletReceiver { + key: key, + config: wallet_config, + }); apis.start(addr).unwrap_or_else(|e| { error!("Failed to start Grin wallet receiver: {}.", e); }); @@ -304,7 +305,7 @@ fn read_config() -> grin::ServerConfig { fn default_config() -> grin::ServerConfig { grin::ServerConfig { - test_mode: true, + test_mode: true, seeding_type: grin::Seeding::WebStatic, ..Default::default() } diff --git a/util/src/lib.rs b/util/src/lib.rs index 5256ee20f..9287f4bd3 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -26,6 +26,7 @@ pub use hex::*; // construction. This implementation will purposefully fail hard if not used // properly, for example if it's not initialized before being first used // (borrowed). +#[derive(Clone)] pub struct OneTime { inner: RefCell>, }