compare genesis during peering handshake (#327)

* wip - send genesis in handshake
* error if genesis mismatch on handshake
* fix the tests
* preserve order of existing fields in hand/shake
This commit is contained in:
AntiochP 2017-11-20 12:35:52 -05:00 committed by Ignotus Peverell
parent 1f0808fc24
commit 00d82f2c04
11 changed files with 116 additions and 80 deletions

View file

@ -72,7 +72,7 @@ impl Chain {
pub fn init( pub fn init(
db_root: String, db_root: String,
adapter: Arc<ChainAdapter>, adapter: Arc<ChainAdapter>,
gen_block: Option<Block>, genesis: Block,
pow_verifier: fn(&BlockHeader, u32) -> bool, pow_verifier: fn(&BlockHeader, u32) -> bool,
) -> Result<Chain, Error> { ) -> Result<Chain, Error> {
let chain_store = store::ChainKVStore::new(db_root.clone())?; let chain_store = store::ChainKVStore::new(db_root.clone())?;
@ -81,29 +81,30 @@ impl Chain {
let head = match chain_store.head() { let head = match chain_store.head() {
Ok(tip) => tip, Ok(tip) => tip,
Err(NotFoundErr) => { Err(NotFoundErr) => {
if let None = gen_block { chain_store.save_block(&genesis)?;
return Err(Error::GenesisBlockRequired); chain_store.setup_height(&genesis.header)?;
}
let gen = gen_block.unwrap();
chain_store.save_block(&gen)?;
chain_store.setup_height(&gen.header)?;
// saving a new tip based on genesis // saving a new tip based on genesis
let tip = Tip::new(gen.hash()); let tip = Tip::new(genesis.hash());
chain_store.save_head(&tip)?; chain_store.save_head(&tip)?;
info!( info!(
LOGGER, LOGGER,
"Saved genesis block with hash: {:?}, nonce: {:?}, pow: {:?}", "Saved genesis block: {:?}, nonce: {:?}, pow: {:?}",
gen.hash(), genesis.hash(),
gen.header.nonce, genesis.header.nonce,
gen.header.pow, genesis.header.pow,
); );
tip tip
} }
Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())), Err(e) => return Err(Error::StoreErr(e, "chain init load head".to_owned())),
}; };
info!(
LOGGER,
"Chain init: {:?}",
head,
);
let store = Arc::new(chain_store); let store = Arc::new(chain_store);
let sumtrees = sumtree::SumTrees::open(db_root, store.clone())?; let sumtrees = sumtree::SumTrees::open(db_root, store.clone())?;

View file

@ -44,10 +44,7 @@ fn setup(dir_name: &str) -> Chain {
let _ = env_logger::init(); let _ = env_logger::init();
clean_output_dir(dir_name); clean_output_dir(dir_name);
global::set_mining_mode(ChainTypes::AutomatedTesting); global::set_mining_mode(ChainTypes::AutomatedTesting);
let mut genesis_block = None; let genesis_block = pow::mine_genesis_block(None).unwrap();
if !chain::Chain::chain_exists(dir_name.to_string()) {
genesis_block = pow::mine_genesis_block(None);
}
chain::Chain::init( chain::Chain::init(
dir_name.to_string(), dir_name.to_string(),
Arc::new(NoopAdapter {}), Arc::new(NoopAdapter {}),

View file

@ -44,10 +44,8 @@ fn test_coinbase_maturity() {
clean_output_dir(".grin"); clean_output_dir(".grin");
global::set_mining_mode(ChainTypes::AutomatedTesting); global::set_mining_mode(ChainTypes::AutomatedTesting);
let mut genesis_block = None; let genesis_block = pow::mine_genesis_block(None).unwrap();
if !chain::Chain::chain_exists(".grin".to_string()) {
genesis_block = pow::mine_genesis_block(None);
}
let chain = chain::Chain::init( let chain = chain::Chain::init(
".grin".to_string(), ".grin".to_string(),
Arc::new(NoopAdapter {}), Arc::new(NoopAdapter {}),

View file

@ -89,20 +89,20 @@ impl Server {
let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone()));
let mut genesis_block = None; let genesis = match config.chain_type {
if !chain::Chain::chain_exists(config.db_root.clone()) { global::ChainTypes::Testnet1 => genesis::genesis_testnet1(),
let chain_type = config.chain_type.clone(); _ => pow::mine_genesis_block(config.mining_config.clone())?,
if chain_type == global::ChainTypes::Testnet1 { };
genesis_block = Some(genesis::genesis_testnet1()); info!(
} else { LOGGER,
genesis_block = pow::mine_genesis_block(config.mining_config.clone()); "Starting server, genesis block: {}",
} genesis.hash(),
} );
let shared_chain = Arc::new(chain::Chain::init( let shared_chain = Arc::new(chain::Chain::init(
config.db_root.clone(), config.db_root.clone(),
chain_adapter.clone(), chain_adapter.clone(),
genesis_block, genesis.clone(),
pow::verify_size, pow::verify_size,
)?); )?);
@ -114,11 +114,14 @@ impl Server {
tx_pool.clone(), tx_pool.clone(),
peer_store.clone(), peer_store.clone(),
)); ));
let p2p_server = Arc::new(p2p::Server::new( let p2p_server = Arc::new(p2p::Server::new(
config.capabilities, config.capabilities,
config.p2p_config.unwrap(), config.p2p_config.unwrap(),
net_adapter.clone(), net_adapter.clone(),
genesis.hash(),
)); ));
chain_adapter.init(p2p_server.clone()); chain_adapter.init(p2p_server.clone());
pool_net_adapter.init(p2p_server.clone()); pool_net_adapter.init(p2p_server.clone());

View file

@ -36,6 +36,7 @@ pub enum Error {
API(api::Error), API(api::Error),
/// Error originating from wallet API. /// Error originating from wallet API.
Wallet(wallet::Error), Wallet(wallet::Error),
Cuckoo(pow::cuckoo::Error),
} }
impl From<chain::Error> for Error { impl From<chain::Error> for Error {
@ -50,6 +51,12 @@ impl From<p2p::Error> for Error {
} }
} }
impl From<pow::cuckoo::Error> for Error {
fn from(e: pow::cuckoo::Error) -> Error {
Error::Cuckoo(e)
}
}
impl From<store::Error> for Error { impl From<store::Error> for Error {
fn from(e: store::Error) -> Error { fn from(e: store::Error) -> Error {
Error::Store(e) Error::Store(e)

View file

@ -22,7 +22,7 @@ use rand::os::OsRng;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::ser; use core::core::hash::Hash;
use msg::*; use msg::*;
use types::*; use types::*;
use protocol::ProtocolV1; use protocol::ProtocolV1;
@ -36,6 +36,9 @@ pub struct Handshake {
/// Ring buffer of nonces sent to detect self connections without requiring /// Ring buffer of nonces sent to detect self connections without requiring
/// a node id. /// a node id.
nonces: Arc<RwLock<VecDeque<u64>>>, nonces: Arc<RwLock<VecDeque<u64>>>,
/// The genesis block header of the chain seen by this node.
/// We only want to connect to other nodes seeing the same chain (forks are ok).
genesis: Hash,
} }
unsafe impl Sync for Handshake {} unsafe impl Sync for Handshake {}
@ -43,9 +46,10 @@ unsafe impl Send for Handshake {}
impl Handshake { impl Handshake {
/// Creates a new handshake handler /// Creates a new handshake handler
pub fn new() -> Handshake { pub fn new(genesis: Hash) -> Handshake {
Handshake { Handshake {
nonces: Arc::new(RwLock::new(VecDeque::with_capacity(NONCES_CAP))), nonces: Arc::new(RwLock::new(VecDeque::with_capacity(NONCES_CAP))),
genesis: genesis,
} }
} }
@ -64,34 +68,35 @@ impl Handshake {
Ok(pa) => pa, Ok(pa) => pa,
Err(e) => return Box::new(futures::future::err(Error::Connection(e))), Err(e) => return Box::new(futures::future::err(Error::Connection(e))),
}; };
debug!(
LOGGER,
"handshake connect with nonce - {}, sender - {:?}, receiver - {:?}",
nonce,
self_addr,
peer_addr,
);
let hand = Hand { let hand = Hand {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
capabilities: capab, capabilities: capab,
nonce: nonce, nonce: nonce,
genesis: self.genesis,
total_difficulty: total_difficulty, total_difficulty: total_difficulty,
sender_addr: SockAddr(self_addr), sender_addr: SockAddr(self_addr),
receiver_addr: SockAddr(peer_addr), receiver_addr: SockAddr(peer_addr),
user_agent: USER_AGENT.to_string(), user_agent: USER_AGENT.to_string(),
}; };
let genesis = self.genesis;
// write and read the handshake response // write and read the handshake response
Box::new( Box::new(
write_msg(conn, hand, Type::Hand) write_msg(conn, hand, Type::Hand)
.and_then(|conn| read_msg::<Shake>(conn)) .and_then(|conn| read_msg::<Shake>(conn))
.and_then(move |(conn, shake)| { .and_then(move |(conn, shake)| {
if shake.version != 1 { if shake.version != PROTOCOL_VERSION {
Err(Error::Serialization(ser::Error::UnexpectedData { Err(Error::ProtocolMismatch {
expected: vec![PROTOCOL_VERSION as u8], us: PROTOCOL_VERSION,
received: vec![shake.version as u8], peer: shake.version,
})) })
} else if shake.genesis != genesis {
Err(Error::GenesisMismatch {
us: genesis,
peer: shake.genesis,
})
} else { } else {
let peer_info = PeerInfo { let peer_info = PeerInfo {
capabilities: shake.capabilities, capabilities: shake.capabilities,
@ -102,6 +107,7 @@ impl Handshake {
}; };
info!(LOGGER, "Connected to peer {:?}", peer_info); info!(LOGGER, "Connected to peer {:?}", peer_info);
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
Ok((conn, ProtocolV1::new(), peer_info)) Ok((conn, ProtocolV1::new(), peer_info))
} }
@ -118,30 +124,25 @@ impl Handshake {
conn: TcpStream, conn: TcpStream,
) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> { ) -> Box<Future<Item = (TcpStream, ProtocolV1, PeerInfo), Error = Error>> {
let nonces = self.nonces.clone(); let nonces = self.nonces.clone();
let genesis = self.genesis.clone();
Box::new( Box::new(
read_msg::<Hand>(conn) read_msg::<Hand>(conn)
.and_then(move |(conn, hand)| { .and_then(move |(conn, hand)| {
if hand.version != 1 { if hand.version != PROTOCOL_VERSION {
return Err(Error::Serialization(ser::Error::UnexpectedData { return Err(Error::ProtocolMismatch {
expected: vec![PROTOCOL_VERSION as u8], us: PROTOCOL_VERSION,
received: vec![hand.version as u8], peer: hand.version,
})); });
} } else if hand.genesis != genesis {
{ return Err(Error::GenesisMismatch {
// check the nonce to see if we could be trying to connect to ourselves us: genesis,
peer: hand.genesis,
});
} else {
// check the nonce to see if we are trying to connect to ourselves
let nonces = nonces.read().unwrap(); let nonces = nonces.read().unwrap();
debug!(
LOGGER,
"checking the nonce - {}, {:?}",
&hand.nonce,
nonces,
);
if nonces.contains(&hand.nonce) { if nonces.contains(&hand.nonce) {
debug!(LOGGER, "***** nonce matches! Avoiding connecting to ourselves"); return Err(Error::PeerWithSelf);
return Err(Error::Serialization(ser::Error::UnexpectedData {
expected: vec![],
received: vec![],
}));
} }
} }
@ -157,6 +158,7 @@ impl Handshake {
let shake = Shake { let shake = Shake {
version: PROTOCOL_VERSION, version: PROTOCOL_VERSION,
capabilities: capab, capabilities: capab,
genesis: genesis,
total_difficulty: total_difficulty, total_difficulty: total_difficulty,
user_agent: USER_AGENT.to_string(), user_agent: USER_AGENT.to_string(),
}; };
@ -165,7 +167,7 @@ impl Handshake {
.and_then(|(conn, shake, peer_info)| { .and_then(|(conn, shake, peer_info)| {
debug!(LOGGER, "Success handshake with {}.", peer_info.addr); debug!(LOGGER, "Success handshake with {}.", peer_info.addr);
write_msg(conn, shake, Type::Shake) write_msg(conn, shake, Type::Shake)
// when more than one protocol version is supported, choosing should go here // when more than one protocol version is supported, choosing should go here
.map(|conn| (conn, ProtocolV1::new(), peer_info)) .map(|conn| (conn, ProtocolV1::new(), peer_info))
}), }),
) )
@ -193,8 +195,6 @@ fn extract_ip(advertised: &SocketAddr, conn: &TcpStream) -> SocketAddr {
match advertised { match advertised {
&SocketAddr::V4(v4sock) => { &SocketAddr::V4(v4sock) => {
let ip = v4sock.ip(); let ip = v4sock.ip();
debug!(LOGGER, "extract_ip - {:?}, {:?}", ip, conn.peer_addr());
if ip.is_loopback() || ip.is_unspecified() { if ip.is_loopback() || ip.is_unspecified() {
if let Ok(addr) = conn.peer_addr() { if let Ok(addr) = conn.peer_addr() {
return SocketAddr::new(addr.ip(), advertised.port()); return SocketAddr::new(addr.ip(), advertised.port());

View file

@ -131,7 +131,7 @@ pub struct MsgHeader {
magic: [u8; 2], magic: [u8; 2],
/// Type of the message. /// Type of the message.
pub msg_type: Type, pub msg_type: Type,
/// Tota length of the message in bytes. /// Total length of the message in bytes.
pub msg_len: u64, pub msg_len: u64,
} }
@ -189,6 +189,8 @@ pub struct Hand {
pub capabilities: Capabilities, pub capabilities: Capabilities,
/// randomly generated for each handshake, helps detect self /// randomly generated for each handshake, helps detect self
pub nonce: u64, pub nonce: u64,
/// genesis block of our chain, only connect to peers on the same chain
pub genesis: Hash,
/// total difficulty accumulated by the sender, used to check whether sync /// total difficulty accumulated by the sender, used to check whether sync
/// may be needed /// may be needed
pub total_difficulty: Difficulty, pub total_difficulty: Difficulty,
@ -211,23 +213,27 @@ impl Writeable for Hand {
self.total_difficulty.write(writer).unwrap(); self.total_difficulty.write(writer).unwrap();
self.sender_addr.write(writer).unwrap(); self.sender_addr.write(writer).unwrap();
self.receiver_addr.write(writer).unwrap(); self.receiver_addr.write(writer).unwrap();
writer.write_bytes(&self.user_agent) writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
Ok(())
} }
} }
impl Readable for Hand { impl Readable for Hand {
fn read(reader: &mut Reader) -> Result<Hand, ser::Error> { fn read(reader: &mut Reader) -> Result<Hand, ser::Error> {
let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64); let (version, capab, nonce) = ser_multiread!(reader, read_u32, read_u32, read_u64);
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData,));
let total_diff = try!(Difficulty::read(reader)); let total_diff = try!(Difficulty::read(reader));
let sender_addr = try!(SockAddr::read(reader)); let sender_addr = try!(SockAddr::read(reader));
let receiver_addr = try!(SockAddr::read(reader)); let receiver_addr = try!(SockAddr::read(reader));
let ua = try!(reader.read_vec()); let ua = try!(reader.read_vec());
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData,)); let genesis = try!(Hash::read(reader));
Ok(Hand { Ok(Hand {
version: version, version: version,
capabilities: capabilities, capabilities: capabilities,
nonce: nonce, nonce: nonce,
genesis: genesis,
total_difficulty: total_diff, total_difficulty: total_diff,
sender_addr: sender_addr, sender_addr: sender_addr,
receiver_addr: receiver_addr, receiver_addr: receiver_addr,
@ -243,6 +249,8 @@ pub struct Shake {
pub version: u32, pub version: u32,
/// sender capabilities /// sender capabilities
pub capabilities: Capabilities, pub capabilities: Capabilities,
/// genesis block of our chain, only connect to peers on the same chain
pub genesis: Hash,
/// total difficulty accumulated by the sender, used to check whether sync /// total difficulty accumulated by the sender, used to check whether sync
/// may be needed /// may be needed
pub total_difficulty: Difficulty, pub total_difficulty: Difficulty,
@ -259,6 +267,7 @@ impl Writeable for Shake {
); );
self.total_difficulty.write(writer).unwrap(); self.total_difficulty.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap(); writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
Ok(()) Ok(())
} }
} }
@ -266,13 +275,15 @@ impl Writeable for Shake {
impl Readable for Shake { impl Readable for Shake {
fn read(reader: &mut Reader) -> Result<Shake, ser::Error> { fn read(reader: &mut Reader) -> Result<Shake, ser::Error> {
let (version, capab) = ser_multiread!(reader, read_u32, read_u32); let (version, capab) = ser_multiread!(reader, read_u32, read_u32);
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData,));
let total_diff = try!(Difficulty::read(reader)); let total_diff = try!(Difficulty::read(reader));
let ua = try!(reader.read_vec()); let ua = try!(reader.read_vec());
let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData)); let user_agent = try!(String::from_utf8(ua).map_err(|_| ser::Error::CorruptedData));
let capabilities = try!(Capabilities::from_bits(capab).ok_or(ser::Error::CorruptedData,)); let genesis = try!(Hash::read(reader));
Ok(Shake { Ok(Shake {
version: version, version: version,
capabilities: capabilities, capabilities: capabilities,
genesis: genesis,
total_difficulty: total_diff, total_difficulty: total_diff,
user_agent: user_agent, user_agent: user_agent,
}) })

View file

@ -77,12 +77,17 @@ unsafe impl Send for Server {}
// TODO TLS // TODO TLS
impl Server { impl Server {
/// Creates a new idle p2p server with no peers /// Creates a new idle p2p server with no peers
pub fn new(capab: Capabilities, config: P2PConfig, adapter: Arc<NetAdapter>) -> Server { pub fn new(
capab: Capabilities,
config: P2PConfig,
adapter: Arc<NetAdapter>,
genesis: Hash,
) -> Server {
Server { Server {
config: config, config: config,
capabilities: capab, capabilities: capab,
peers: Arc::new(RwLock::new(HashMap::new())), peers: Arc::new(RwLock::new(HashMap::new())),
handshake: Arc::new(Handshake::new()), handshake: Arc::new(Handshake::new(genesis)),
adapter: adapter, adapter: adapter,
stop: RefCell::new(None), stop: RefCell::new(None),
} }
@ -180,8 +185,6 @@ impl Server {
let capab = self.capabilities.clone(); let capab = self.capabilities.clone();
let self_addr = SocketAddr::new(self.config.host, self.config.port); let self_addr = SocketAddr::new(self.config.host, self.config.port);
debug!(LOGGER, "{} connecting to {}", self_addr, addr);
let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::Connection(e)); let socket = TcpStream::connect(&addr, &h).map_err(|e| Error::Connection(e));
let h2 = h.clone(); let h2 = h.clone();
let request = socket let request = socket

View file

@ -45,6 +45,15 @@ pub enum Error {
Connection(io::Error), Connection(io::Error),
ConnectionClose, ConnectionClose,
Timeout, Timeout,
PeerWithSelf,
ProtocolMismatch {
us: u32,
peer: u32,
},
GenesisMismatch {
us: Hash,
peer: Hash,
},
} }
impl From<ser::Error> for Error { impl From<ser::Error> for Error {

View file

@ -26,6 +26,7 @@ use tokio_core::net::TcpStream;
use tokio_core::reactor::{self, Core}; use tokio_core::reactor::{self, Core};
use core::core::target::Difficulty; use core::core::target::Difficulty;
use core::core::hash::Hash;
use p2p::Peer; use p2p::Peer;
// Starts a server and connects a client peer to it to check handshake, // Starts a server and connects a client peer to it to check handshake,
@ -36,7 +37,12 @@ fn peer_handshake() {
let handle = evtlp.handle(); let handle = evtlp.handle();
let p2p_conf = p2p::P2PConfig::default(); let p2p_conf = p2p::P2PConfig::default();
let net_adapter = Arc::new(p2p::DummyAdapter {}); let net_adapter = Arc::new(p2p::DummyAdapter {});
let server = p2p::Server::new(p2p::UNKNOWN, p2p_conf, net_adapter.clone()); let server = p2p::Server::new(
p2p::UNKNOWN,
p2p_conf,
net_adapter.clone(),
Hash::from_vec(vec![]),
);
let run_server = server.start(handle.clone()); let run_server = server.start(handle.clone());
let my_addr = "127.0.0.1:5000".parse().unwrap(); let my_addr = "127.0.0.1:5000".parse().unwrap();
@ -59,7 +65,7 @@ fn peer_handshake() {
p2p::UNKNOWN, p2p::UNKNOWN,
Difficulty::one(), Difficulty::one(),
my_addr, my_addr,
Arc::new(p2p::handshake::Handshake::new()), Arc::new(p2p::handshake::Handshake::new(Hash::from_vec(vec![]))),
net_adapter.clone(), net_adapter.clone(),
) )
}) })

View file

@ -95,8 +95,9 @@ pub fn pow20<T: MiningWorker>(
/// Mines a genesis block, using the config specified miner if specified. /// Mines a genesis block, using the config specified miner if specified.
/// Otherwise, uses the internal miner /// Otherwise, uses the internal miner
pub fn mine_genesis_block(miner_config: Option<types::MinerConfig>) -> Option<core::core::Block> { pub fn mine_genesis_block(
info!(util::LOGGER, "Genesis block not found, initializing..."); miner_config: Option<types::MinerConfig>,
) -> Result<core::core::Block, Error> {
let mut gen = genesis::genesis_dev(); let mut gen = genesis::genesis_dev();
let diff = gen.header.difficulty.clone(); let diff = gen.header.difficulty.clone();
@ -114,7 +115,7 @@ pub fn mine_genesis_block(miner_config: Option<types::MinerConfig>) -> Option<co
None => Box::new(cuckoo::Miner::new(consensus::EASINESS, sz, proof_size)), None => Box::new(cuckoo::Miner::new(consensus::EASINESS, sz, proof_size)),
}; };
pow_size(&mut *miner, &mut gen.header, diff, sz as u32).unwrap(); pow_size(&mut *miner, &mut gen.header, diff, sz as u32).unwrap();
Some(gen) Ok(gen)
} }
/// Runs a proof of work computation over the provided block using the provided /// Runs a proof of work computation over the provided block using the provided