diff --git a/libsigner/src/events.rs b/libsigner/src/events.rs index bba3df86d8..bb236e5c9e 100644 --- a/libsigner/src/events.rs +++ b/libsigner/src/events.rs @@ -214,6 +214,8 @@ pub enum SignerEvent { consensus_hash: ConsensusHash, /// the time at which this event was received by the signer's event processor received_time: SystemTime, + /// the parent burn block hash for the newly processed burn block + parent_burn_block_hash: BurnchainHeaderHash, }, /// A new processed Stacks block was received from the node with the given block hash NewBlock { @@ -585,6 +587,8 @@ struct BurnBlockEvent { burn_amount: u64, #[serde(with = "prefix_hex")] consensus_hash: ConsensusHash, + #[serde(with = "prefix_hex")] + parent_burn_block_hash: BurnchainHeaderHash, } impl TryFrom for SignerEvent { @@ -596,6 +600,7 @@ impl TryFrom for SignerEvent { received_time: SystemTime::now(), burn_header_hash: burn_block_event.burn_block_hash, consensus_hash: burn_block_event.consensus_hash, + parent_burn_block_hash: burn_block_event.parent_burn_block_hash, }) } } diff --git a/stacks-signer/src/client/stacks_client.rs b/stacks-signer/src/client/stacks_client.rs index 47b538679c..2e3712a543 100644 --- a/stacks-signer/src/client/stacks_client.rs +++ b/stacks-signer/src/client/stacks_client.rs @@ -452,6 +452,22 @@ impl StacksClient { }) } + /// Get the sortition info for a given consensus hash + pub fn get_sortition_by_consensus_hash( + &self, + consensus_hash: &ConsensusHash, + ) -> Result { + let path = self.sortition_by_consensus_hash_path(consensus_hash); + let response = self.stacks_node_client.get(&path).send()?; + if !response.status().is_success() { + return Err(ClientError::RequestFailure(response.status())); + } + let sortition_info = response.json::>()?; + sortition_info.first().cloned().ok_or_else(|| { + ClientError::InvalidResponse("No sortition info found for given consensus hash".into()) + }) + } + /// Get the current peer info data from the stacks node pub fn get_peer_info(&self) -> Result { debug!("StacksClient: Getting peer info"); @@ -725,6 +741,14 @@ impl StacksClient { format!("{}{RPC_SORTITION_INFO_PATH}", self.http_origin) } + fn sortition_by_consensus_hash_path(&self, consensus_hash: &ConsensusHash) -> String { + format!( + "{}{RPC_SORTITION_INFO_PATH}/consensus/{}", + self.http_origin, + consensus_hash.to_hex() + ) + } + fn tenure_forking_info_path(&self, start: &ConsensusHash, stop: &ConsensusHash) -> String { format!( "{}{RPC_TENURE_FORKING_INFO_PATH}/{}/{}", diff --git a/stacks-signer/src/signerdb.rs b/stacks-signer/src/signerdb.rs index 08f4679cbe..1d0014a8e6 100644 --- a/stacks-signer/src/signerdb.rs +++ b/stacks-signer/src/signerdb.rs @@ -21,12 +21,12 @@ use std::time::{Duration, SystemTime}; use blockstack_lib::chainstate::nakamoto::NakamotoBlock; use blockstack_lib::chainstate::stacks::TransactionPayload; +#[cfg(any(test, feature = "testing"))] +use blockstack_lib::util_lib::db::FromColumn; use blockstack_lib::util_lib::db::{ query_row, query_rows, sqlite_open, table_exists, tx_begin_immediate, u64_to_sql, - Error as DBError, + Error as DBError, FromRow, }; -#[cfg(any(test, feature = "testing"))] -use blockstack_lib::util_lib::db::{FromColumn, FromRow}; use clarity::types::chainstate::{BurnchainHeaderHash, StacksAddress}; use clarity::types::Address; use libsigner::v0::messages::{RejectReason, RejectReasonPrefix, StateMachineUpdate}; @@ -72,6 +72,34 @@ impl StacksMessageCodec for NakamotoBlockVote { } } +#[derive(Serialize, Deserialize, Debug, PartialEq)] +/// Struct for storing information about a burn block +pub struct BurnBlockInfo { + /// The hash of the burn block + pub block_hash: BurnchainHeaderHash, + /// The height of the burn block + pub block_height: u64, + /// The consensus hash of the burn block + pub consensus_hash: ConsensusHash, + /// The hash of the parent burn block + pub parent_burn_block_hash: BurnchainHeaderHash, +} + +impl FromRow for BurnBlockInfo { + fn from_row(row: &rusqlite::Row) -> Result { + let block_hash: BurnchainHeaderHash = row.get(0)?; + let block_height: u64 = row.get(1)?; + let consensus_hash: ConsensusHash = row.get(2)?; + let parent_burn_block_hash: BurnchainHeaderHash = row.get(3)?; + Ok(BurnBlockInfo { + block_hash, + block_height, + consensus_hash, + parent_burn_block_hash, + }) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Default)] /// Store extra version-specific info in `BlockInfo` pub enum ExtraBlockInfo { @@ -566,6 +594,15 @@ CREATE TABLE IF NOT EXISTS signer_state_machine_updates ( PRIMARY KEY (signer_addr, reward_cycle) ) STRICT;"#; +static ADD_PARENT_BURN_BLOCK_HASH: &str = r#" + ALTER TABLE burn_blocks + ADD COLUMN parent_burn_block_hash TEXT; +"#; + +static ADD_PARENT_BURN_BLOCK_HASH_INDEX: &str = r#" +CREATE INDEX IF NOT EXISTS burn_blocks_parent_burn_block_hash_idx on burn_blocks (parent_burn_block_hash); +"#; + static SCHEMA_1: &[&str] = &[ DROP_SCHEMA_0, CREATE_DB_CONFIG, @@ -652,6 +689,12 @@ static SCHEMA_12: &[&str] = &[ "INSERT OR REPLACE INTO db_config (version) VALUES (12);", ]; +static SCHEMA_13: &[&str] = &[ + ADD_PARENT_BURN_BLOCK_HASH, + ADD_PARENT_BURN_BLOCK_HASH_INDEX, + "INSERT INTO db_config (version) VALUES (13);", +]; + impl SignerDb { /// The current schema version used in this build of the signer binary. pub const SCHEMA_VERSION: u32 = 12; @@ -852,6 +895,20 @@ impl SignerDb { Ok(()) } + /// Migrate from schema 12 to schema 13 + fn schema_13_migration(tx: &Transaction) -> Result<(), DBError> { + if Self::get_schema_version(tx)? >= 13 { + // no migration necessary + return Ok(()); + } + + for statement in SCHEMA_13.iter() { + tx.execute_batch(statement)?; + } + + Ok(()) + } + /// Register custom scalar functions used by the database fn register_scalar_functions(&self) -> Result<(), DBError> { // Register helper function for determining if a block is a tenure change transaction @@ -897,7 +954,8 @@ impl SignerDb { 9 => Self::schema_10_migration(&sql_tx)?, 10 => Self::schema_11_migration(&sql_tx)?, 11 => Self::schema_12_migration(&sql_tx)?, - 12 => break, + 12 => Self::schema_13_migration(&sql_tx)?, + 13 => break, x => return Err(DBError::Other(format!( "Database schema is newer than supported by this binary. Expected version = {}, Database version = {x}", Self::SCHEMA_VERSION, @@ -1032,19 +1090,27 @@ impl SignerDb { consensus_hash: &ConsensusHash, burn_height: u64, received_time: &SystemTime, + parent_burn_block_hash: &BurnchainHeaderHash, ) -> Result<(), DBError> { let received_ts = received_time .duration_since(std::time::UNIX_EPOCH) .map_err(|e| DBError::Other(format!("Bad system time: {e}")))? .as_secs(); - debug!("Inserting burn block info"; "burn_block_height" => burn_height, "burn_hash" => %burn_hash, "received" => received_ts, "ch" => %consensus_hash); + debug!("Inserting burn block info"; + "burn_block_height" => burn_height, + "burn_hash" => %burn_hash, + "received" => received_ts, + "ch" => %consensus_hash, + "parent_burn_block_hash" => %parent_burn_block_hash + ); self.db.execute( - "INSERT OR REPLACE INTO burn_blocks (block_hash, consensus_hash, block_height, received_time) VALUES (?1, ?2, ?3, ?4)", + "INSERT OR REPLACE INTO burn_blocks (block_hash, consensus_hash, block_height, received_time, parent_burn_block_hash) VALUES (?1, ?2, ?3, ?4, ?5)", params![ burn_hash, consensus_hash, u64_to_sql(burn_height)?, u64_to_sql(received_ts)?, + parent_burn_block_hash, ], )?; Ok(()) @@ -1084,6 +1150,26 @@ impl SignerDb { Ok(Some(receive_time)) } + /// Lookup the burn block for a given burn block hash. + pub fn get_burn_block_by_hash( + &self, + burn_block_hash: &BurnchainHeaderHash, + ) -> Result { + let query = + "SELECT block_hash, block_height, consensus_hash, parent_burn_block_hash FROM burn_blocks WHERE block_hash = ?"; + let args = params![burn_block_hash]; + + query_row(&self.db, query, args)?.ok_or(DBError::NotFoundError) + } + + /// Lookup the burn block for a given consensus hash. + pub fn get_burn_block_by_ch(&self, ch: &ConsensusHash) -> Result { + let query = "SELECT block_hash, block_height, consensus_hash, parent_burn_block_hash FROM burn_blocks WHERE consensus_hash = ?"; + let args = params![ch]; + + query_row(&self.db, query, args)?.ok_or(DBError::NotFoundError) + } + /// Insert or replace a block into the database. /// Preserves the `broadcast` column if replacing an existing block. pub fn insert_block(&mut self, block_info: &BlockInfo) -> Result<(), DBError> { @@ -1717,8 +1803,14 @@ pub mod tests { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); - db.insert_burn_block(&test_burn_hash, &test_consensus_hash, 10, &stime) - .unwrap(); + db.insert_burn_block( + &test_burn_hash, + &test_consensus_hash, + 10, + &stime, + &test_burn_hash, + ) + .unwrap(); let stored_time = db .get_burn_block_receive_time(&test_burn_hash) diff --git a/stacks-signer/src/tests/chainstate.rs b/stacks-signer/src/tests/chainstate.rs index 422b0c84d4..4569343ea1 100644 --- a/stacks-signer/src/tests/chainstate.rs +++ b/stacks-signer/src/tests/chainstate.rs @@ -210,6 +210,7 @@ fn reorg_timing_testing( consensus_hash: last_sortition.consensus_hash, was_sortition: true, first_block_mined: Some(StacksBlockId([1; 32])), + nakamoto_blocks: None, }, TenureForkingInfo { burn_block_hash: BurnchainHeaderHash([128; 32]), @@ -219,6 +220,7 @@ fn reorg_timing_testing( consensus_hash: view.cur_sortition.parent_tenure_id, was_sortition: true, first_block_mined: Some(StacksBlockId([2; 32])), + nakamoto_blocks: None, }, ]; @@ -256,6 +258,7 @@ fn reorg_timing_testing( &view.cur_sortition.consensus_hash, 3, &sortition_time, + &view.last_sortition.as_ref().unwrap().burn_block_hash, ) .unwrap(); @@ -394,7 +397,13 @@ fn check_block_proposal_timeout() { let burn_height = 1; let received_time = SystemTime::now(); signer_db - .insert_burn_block(&burn_hash, &consensus_hash, burn_height, &received_time) + .insert_burn_block( + &burn_hash, + &consensus_hash, + burn_height, + &received_time, + &view.last_sortition.as_ref().unwrap().burn_block_hash, + ) .unwrap(); view.check_proposal( @@ -466,7 +475,13 @@ fn check_sortition_timeout() { let burn_height = 1; let received_time = SystemTime::now(); signer_db - .insert_burn_block(&burn_hash, &consensus_hash, burn_height, &received_time) + .insert_burn_block( + &burn_hash, + &consensus_hash, + burn_height, + &received_time, + &BurnchainHeaderHash([0; 32]), + ) .unwrap(); std::thread::sleep(Duration::from_secs(1)); diff --git a/stacks-signer/src/v0/signer.rs b/stacks-signer/src/v0/signer.rs index 1080b0543a..a594b3c769 100644 --- a/stacks-signer/src/v0/signer.rs +++ b/stacks-signer/src/v0/signer.rs @@ -51,6 +51,7 @@ use crate::client::{ClientError, SignerSlotID, StackerDB, StacksClient}; use crate::config::{SignerConfig, SignerConfigMode}; use crate::runloop::SignerResult; use crate::signerdb::{BlockInfo, BlockState, SignerDb}; +use crate::v0::signer_state::NewBurnBlock; use crate::Signer as SignerTrait; /// A global variable that can be used to make signers repeat their proposal @@ -486,6 +487,7 @@ impl Signer { burn_header_hash, consensus_hash, received_time, + parent_burn_block_hash, } => { info!("{self}: Received a new burn block event for block height {burn_height}"); self.signer_db @@ -494,6 +496,7 @@ impl Signer { consensus_hash, *burn_height, received_time, + parent_burn_block_hash, ) .unwrap_or_else(|e| { error!( @@ -505,7 +508,10 @@ impl Signer { panic!("{self} Failed to write burn block event to signerdb: {e}"); }); self.local_state_machine - .bitcoin_block_arrival(&self.signer_db, stacks_client, &self.proposal_config, Some(*burn_height)) + .bitcoin_block_arrival(&self.signer_db, stacks_client, &self.proposal_config, Some(NewBurnBlock { + burn_block_height: *burn_height, + consensus_hash: *consensus_hash, + })) .unwrap_or_else(|e| error!("{self}: failed to update local state machine for latest bitcoin block arrival"; "err" => ?e)); *sortition_state = None; } diff --git a/stacks-signer/src/v0/signer_state.rs b/stacks-signer/src/v0/signer_state.rs index c14cb775d6..eab3ff9051 100644 --- a/stacks-signer/src/v0/signer_state.rs +++ b/stacks-signer/src/v0/signer_state.rs @@ -18,7 +18,7 @@ use std::time::{Duration, UNIX_EPOCH}; use blockstack_lib::chainstate::burn::ConsensusHashExtensions; use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader}; -use blockstack_lib::chainstate::stacks::StacksTransaction; +use blockstack_lib::chainstate::stacks::{StacksTransaction, TransactionPayload}; use clarity::types::chainstate::StacksAddress; use libsigner::v0::messages::{ MessageSlotID, SignerMessage, StateMachineUpdate as StateMachineUpdateMessage, @@ -350,8 +350,17 @@ pub enum LocalStateMachine { /// A pending update for a signer state machine #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum StateMachineUpdate { - /// A new burn block at height u64 is expected - BurnBlock(u64), + /// A new burn block is expected + BurnBlock(NewBurnBlock), +} + +/// Minimal struct for a new burn block +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct NewBurnBlock { + /// The height of the new burn block + pub burn_block_height: u64, + /// The hash of the new burn block + pub consensus_hash: ConsensusHash, } impl LocalStateMachine { @@ -410,7 +419,7 @@ impl LocalStateMachine { }, other => { return Err(CodecError::DeserializeError(format!( - "Active signer ptocol version is unknown: {other}" + "Active signer protocol version is unknown: {other}" ))) } }; @@ -646,6 +655,11 @@ impl LocalStateMachine { } }; + // No matter what, if we're in tx replay mode, remove the tx replay set + // TODO: in later versions, we will only clear the tx replay + // set when replay is completed. + prior_state_machine.tx_replay_set = None; + let MinerState::ActiveMiner { parent_tenure_id, parent_tenure_last_block, @@ -727,7 +741,7 @@ impl LocalStateMachine { db: &SignerDb, client: &StacksClient, proposal_config: &ProposalEvalConfig, - mut expected_burn_height: Option, + mut expected_burn_block: Option, ) -> Result<(), SignerChainstateError> { // set self to uninitialized so that if this function errors, // self is left as uninitialized. @@ -741,31 +755,56 @@ impl LocalStateMachine { // but if we have other kinds of pending updates, this logic will need // to be changed. match update { - StateMachineUpdate::BurnBlock(pending_burn_height) => { - if pending_burn_height > expected_burn_height.unwrap_or(0) { - expected_burn_height = Some(pending_burn_height); + StateMachineUpdate::BurnBlock(pending_burn_block) => { + match expected_burn_block { + None => expected_burn_block = Some(pending_burn_block), + Some(ref expected) => { + if pending_burn_block.burn_block_height > expected.burn_block_height + { + expected_burn_block = Some(pending_burn_block); + } + } } } } - prior + prior.clone() } }; let peer_info = client.get_peer_info()?; let next_burn_block_height = peer_info.burn_block_height; let next_burn_block_hash = peer_info.pox_consensus; - - if let Some(expected_burn_height) = expected_burn_height { - if next_burn_block_height < expected_burn_height { + let mut tx_replay_set = prior_state_machine.tx_replay_set.clone(); + + if let Some(expected_burn_block) = expected_burn_block { + // If the next height is less than the expected height, we need to wait. + // OR if the next height is the same, but with a different hash, we need to wait. + let node_behind_expected = + next_burn_block_height < expected_burn_block.burn_block_height; + let node_on_equal_fork = next_burn_block_height + == expected_burn_block.burn_block_height + && next_burn_block_hash != expected_burn_block.consensus_hash; + if node_behind_expected || node_on_equal_fork { + let err_msg = format!( + "Node has not processed the next burn block yet. Expected height = {}, Expected consensus hash = {}", + expected_burn_block.burn_block_height, + expected_burn_block.consensus_hash, + ); *self = Self::Pending { - update: StateMachineUpdate::BurnBlock(expected_burn_height), + update: StateMachineUpdate::BurnBlock(expected_burn_block), prior: prior_state_machine, }; - return Err(ClientError::InvalidResponse( - "Node has not processed the next burn block yet".into(), - ) - .into()); + return Err(ClientError::InvalidResponse(err_msg).into()); + } + if let Some(new_replay_set) = self.handle_possible_bitcoin_fork( + db, + client, + &expected_burn_block, + &prior_state_machine, + tx_replay_set.is_some(), + )? { + tx_replay_set = Some(new_replay_set); } } @@ -809,7 +848,7 @@ impl LocalStateMachine { burn_block_height: next_burn_block_height, current_miner: miner_state, active_signer_protocol_version: prior_state_machine.active_signer_protocol_version, - tx_replay_set: prior_state_machine.tx_replay_set, + tx_replay_set, }); if prior_state != *self { @@ -981,4 +1020,74 @@ impl LocalStateMachine { }; state.tx_replay_set.clone() } + + /// Handle a possible bitcoin fork. If a fork is detetected, + /// return the transactions that should be replayed. + pub fn handle_possible_bitcoin_fork( + &self, + db: &SignerDb, + client: &StacksClient, + expected_burn_block: &NewBurnBlock, + prior_state_machine: &SignerStateMachine, + is_in_tx_replay_mode: bool, + ) -> Result>, SignerChainstateError> { + if expected_burn_block.burn_block_height > prior_state_machine.burn_block_height { + // no bitcoin fork, because we're advancing the burn block height + return Ok(None); + } + if expected_burn_block.consensus_hash == prior_state_machine.burn_block { + // no bitcoin fork, because we're at the same burn block hash as before + return Ok(None); + } + if is_in_tx_replay_mode { + // TODO: handle fork while still in replay + info!("Detected bitcoin fork while in replay mode, will not try to handle the fork"); + return Ok(None); + } + info!("Signer State: fork detected"; + "expected_burn_block.height" => expected_burn_block.burn_block_height, + "expected_burn_block.hash" => %expected_burn_block.consensus_hash, + "prior_state_machine.burn_block_height" => prior_state_machine.burn_block_height, + "prior_state_machine.burn_block" => %prior_state_machine.burn_block, + ); + // Determine the tenures that were forked + let mut parent_burn_block_info = + db.get_burn_block_by_ch(&prior_state_machine.burn_block)?; + let last_forked_tenure = prior_state_machine.burn_block; + let mut first_forked_tenure = prior_state_machine.burn_block; + let mut forked_tenures = vec![( + prior_state_machine.burn_block, + prior_state_machine.burn_block_height, + )]; + while parent_burn_block_info.block_height > expected_burn_block.burn_block_height { + parent_burn_block_info = + db.get_burn_block_by_hash(&parent_burn_block_info.parent_burn_block_hash)?; + first_forked_tenure = parent_burn_block_info.consensus_hash; + forked_tenures.push(( + parent_burn_block_info.consensus_hash, + parent_burn_block_info.block_height, + )); + } + let fork_info = + client.get_tenure_forking_info(&first_forked_tenure, &last_forked_tenure)?; + let mut forked_blocks = fork_info + .iter() + .flat_map(|fork_info| fork_info.nakamoto_blocks.iter().flatten()) + .collect::>(); + forked_blocks.sort_by_key(|block| block.header.chain_length); + let forked_txs = forked_blocks + .iter() + .flat_map(|block| block.txs.iter()) + .filter(|tx| + // Don't include Coinbase, TenureChange, or PoisonMicroblock transactions + !matches!( + tx.payload, + TransactionPayload::TenureChange(..) + | TransactionPayload::Coinbase(..) + | TransactionPayload::PoisonMicroblock(..) + )) + .cloned() + .collect::>(); + Ok(Some(forked_txs)) + } } diff --git a/stackslib/src/chainstate/nakamoto/staging_blocks.rs b/stackslib/src/chainstate/nakamoto/staging_blocks.rs index 3505d41ebd..b2164cb405 100644 --- a/stackslib/src/chainstate/nakamoto/staging_blocks.rs +++ b/stackslib/src/chainstate/nakamoto/staging_blocks.rs @@ -165,7 +165,12 @@ pub const NAKAMOTO_STAGING_DB_SCHEMA_4: &[&str] = &[ r#"UPDATE db_version SET version = 4"#, ]; -pub const NAKAMOTO_STAGING_DB_SCHEMA_LATEST: u32 = 4; +pub const NAKAMOTO_STAGING_DB_SCHEMA_5: &[&str] = &[ + r#"CREATE INDEX nakamoto_staging_blocks_by_consensus_hash_and_processed ON nakamoto_staging_blocks(consensus_hash, processed);"#, + r#"UPDATE db_version SET version = 5"#, +]; + +pub const NAKAMOTO_STAGING_DB_SCHEMA_LATEST: u32 = 5; pub struct NakamotoStagingBlocksConn(rusqlite::Connection); @@ -433,6 +438,30 @@ impl<'a> NakamotoStagingBlocksConnRef<'a> { .collect()) } + /// Get all nakamoto blocks in a tenure + pub fn get_nakamoto_blocks_in_tenure( + &self, + consensus_hash: &ConsensusHash, + ) -> Result, ChainstateError> { + let qry = + "SELECT data FROM nakamoto_staging_blocks WHERE consensus_hash = ?1 AND processed = 1"; + let args = params![consensus_hash]; + let block_data: Vec> = query_rows(self, qry, args)?; + Ok(block_data + .into_iter() + .filter_map(|block_vec| { + NakamotoBlock::consensus_deserialize(&mut &block_vec[..]) + .map_err(|e| { + error!("Failed to deserialize block from DB, likely database corruption"; + "consensus_hash" => %consensus_hash, + "error" => ?e); + e + }) + .ok() + }) + .collect()) + } + /// Find the next ready-to-process Nakamoto block, given a connection to the staging blocks DB. /// NOTE: the relevant field queried from `nakamoto_staging_blocks` are updated by a separate /// tx from block-processing, so it's imperative that the thread that calls this function is @@ -810,6 +839,15 @@ impl StacksChainState { assert_eq!(version, 4, "Nakamoto staging DB migration failure"); debug!("Migrated Nakamoto staging blocks DB to schema 3"); } + 4 => { + debug!("Migrate Nakamoto staging blocks DB to schema 5"); + for cmd in NAKAMOTO_STAGING_DB_SCHEMA_5.iter() { + conn.execute(cmd, NO_PARAMS)?; + } + let version = Self::get_nakamoto_staging_blocks_db_version(conn)?; + assert_eq!(version, 5, "Nakamoto staging DB migration failure"); + debug!("Migrated Nakamoto staging blocks DB to schema 5"); + } NAKAMOTO_STAGING_DB_SCHEMA_LATEST => { break; } diff --git a/stackslib/src/chainstate/stacks/mod.rs b/stackslib/src/chainstate/stacks/mod.rs index 115383d566..2e7d267239 100644 --- a/stackslib/src/chainstate/stacks/mod.rs +++ b/stackslib/src/chainstate/stacks/mod.rs @@ -1012,14 +1012,6 @@ pub struct StacksTransaction { pub payload: TransactionPayload, } -impl Hash for StacksTransaction { - fn hash(&self, state: &mut H) { - self.txid().hash(state) - } -} - -impl Eq for StacksTransaction {} - #[derive(Debug, Clone, PartialEq)] pub struct StacksTransactionSigner { pub tx: StacksTransaction, diff --git a/stackslib/src/chainstate/stacks/transaction.rs b/stackslib/src/chainstate/stacks/transaction.rs index effa9e8ab4..80bdf8a909 100644 --- a/stackslib/src/chainstate/stacks/transaction.rs +++ b/stackslib/src/chainstate/stacks/transaction.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use std::hash::{Hash, Hasher}; use std::io; use std::io::prelude::*; use std::io::{Read, Write}; @@ -683,6 +684,14 @@ impl StacksTransaction { } } +impl Hash for StacksTransaction { + fn hash(&self, state: &mut H) { + self.txid().hash(state); + } +} + +impl Eq for StacksTransaction {} + impl StacksMessageCodec for StacksTransaction { fn consensus_serialize(&self, fd: &mut W) -> Result<(), codec_error> { write_next(fd, &(self.version as u8))?; diff --git a/stackslib/src/net/api/get_tenures_fork_info.rs b/stackslib/src/net/api/get_tenures_fork_info.rs index 2cb2847290..6958678bf6 100644 --- a/stackslib/src/net/api/get_tenures_fork_info.rs +++ b/stackslib/src/net/api/get_tenures_fork_info.rs @@ -34,7 +34,7 @@ use crate::chainstate::nakamoto::{NakamotoBlock, NakamotoChainState, NakamotoSta use crate::chainstate::stacks::db::StacksChainState; use crate::chainstate::stacks::Error as ChainError; use crate::net::api::getblock_v3::NakamotoBlockStream; -use crate::net::api::{prefix_hex, prefix_opt_hex}; +use crate::net::api::{prefix_hex, prefix_opt_hex, prefix_opt_hex_codec}; use crate::net::http::{ parse_bytes, parse_json, Error, HttpBadRequest, HttpChunkGenerator, HttpContentType, HttpNotFound, HttpRequest, HttpRequestContents, HttpRequestPreamble, HttpResponse, @@ -80,6 +80,9 @@ pub struct TenureForkingInfo { /// tenure's first block. #[serde(with = "prefix_opt_hex")] pub first_block_mined: Option, + /// Nakamoto blocks in the tenure + #[serde(with = "prefix_opt_hex_codec")] + pub nakamoto_blocks: Option>, } #[derive(Clone, Default)] @@ -154,8 +157,8 @@ impl TenureForkingInfo { chainstate: &StacksChainState, tip_block_id: &StacksBlockId, ) -> Result { - let first_block_mined = if !sn.sortition { - None + let (first_block_mined, nakamoto_blocks) = if !sn.sortition { + (None, None) } else { // is this a nakamoto sortition? let epoch = SortitionDB::get_stacks_epoch(sortdb.conn(), sn.block_height)?.ok_or_else( @@ -168,18 +171,28 @@ impl TenureForkingInfo { }, )?; if epoch.epoch_id < StacksEpochId::Epoch30 { - StacksChainState::get_stacks_block_header_info_by_consensus_hash( - chainstate.db(), - &sn.consensus_hash, - )? - .map(|header| header.index_block_hash()) + ( + StacksChainState::get_stacks_block_header_info_by_consensus_hash( + chainstate.db(), + &sn.consensus_hash, + )? + .map(|header| header.index_block_hash()), + None, + ) } else { - NakamotoChainState::get_nakamoto_tenure_start_block_header( - &mut chainstate.index_conn(), - tip_block_id, - &sn.consensus_hash, - )? - .map(|header| header.index_block_hash()) + ( + NakamotoChainState::get_nakamoto_tenure_start_block_header( + &mut chainstate.index_conn(), + tip_block_id, + &sn.consensus_hash, + )? + .map(|header| header.index_block_hash()), + Some( + chainstate + .nakamoto_blocks_db() + .get_nakamoto_blocks_in_tenure(&sn.consensus_hash)?, + ), + ) } }; Ok(TenureForkingInfo { @@ -190,6 +203,7 @@ impl TenureForkingInfo { consensus_hash: sn.consensus_hash.clone(), was_sortition: sn.sortition, first_block_mined, + nakamoto_blocks, }) } } diff --git a/stackslib/src/net/api/mod.rs b/stackslib/src/net/api/mod.rs index 45fbdda42f..ab5b647834 100644 --- a/stackslib/src/net/api/mod.rs +++ b/stackslib/src/net/api/mod.rs @@ -24,6 +24,7 @@ use stacks_common::util::hash::{Hash160, Sha512Trunc256Sum}; use stacks_common::util::HexError; use crate::burnchains::Txid; +use crate::chainstate::nakamoto::NakamotoBlock; use crate::chainstate::stacks::{StacksMicroblock, StacksTransaction}; use crate::core::mempool; use crate::cost_estimates::FeeRateEstimate; @@ -256,6 +257,51 @@ pub mod prefix_hex_codec { } } +/// This module serde encode and decodes structs that +/// implement StacksMessageCodec as a 0x-prefixed hex string. +/// This is the same as prefix_hex_codec, but for Option. +pub mod prefix_opt_hex_codec { + use clarity::codec::StacksMessageCodec; + use clarity::util::hash::{hex_bytes, to_hex}; + + use super::prefix_hex_codec; + + pub fn serialize( + val: &Option, + s: S, + ) -> Result { + match val { + Some(ref some_val) => { + let mut bytes = vec![]; + some_val + .consensus_serialize(&mut bytes) + .map_err(serde::ser::Error::custom)?; + let hex_string = format!("0x{}", to_hex(&bytes)); + s.serialize_some(&hex_string) + } + None => s.serialize_none(), + } + } + + pub fn deserialize<'de, D: serde::Deserializer<'de>, T: StacksMessageCodec>( + d: D, + ) -> Result, D::Error> { + let opt_inst_str: Option = serde::Deserialize::deserialize(d)?; + let Some(inst_string) = opt_inst_str else { + return Ok(None); + }; + let Some(hex_str) = inst_string.get(2..) else { + return Err(serde::de::Error::invalid_length( + inst_string.len(), + &"at least length 2 string", + )); + }; + let bytes = hex_bytes(hex_str).map_err(serde::de::Error::custom)?; + let val = T::consensus_deserialize(&mut &bytes[..]).map_err(serde::de::Error::custom)?; + Ok(Some(val)) + } +} + pub trait HexDeser: Sized { fn try_from(hex: &str) -> Result; } diff --git a/testnet/stacks-node/src/tests/signer/mod.rs b/testnet/stacks-node/src/tests/signer/mod.rs index 83091b78de..3ff80f8e55 100644 --- a/testnet/stacks-node/src/tests/signer/mod.rs +++ b/testnet/stacks-node/src/tests/signer/mod.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use clarity::boot_util::boot_code_id; use clarity::vm::types::PrincipalData; +use clarity::vm::Value; use libsigner::v0::messages::{ BlockAccepted, BlockResponse, MessageSlotID, PeerInfo, SignerMessage, }; @@ -451,43 +452,36 @@ impl + Send + 'static, T: SignerEventTrait + 'static> SignerTest Result { + contract_code: &str, + contract_name: &str, + ) -> Result<(String, u64), String> { let http_origin = format!("http://{}", &self.running_nodes.conf.node.rpc_bind); let sender_addr = to_addr(&sender_sk); let sender_nonce = get_account(&http_origin, &sender_addr).nonce; - let burn_height_contract = " - (define-data-var local-burn-block-ht uint u0) - (define-public (run-update) - (ok (var-set local-burn-block-ht burn-block-height))) - "; + let contract_tx = make_contract_publish( &sender_sk, - 0, + sender_nonce, 1000, self.running_nodes.conf.burnchain.chain_id, - "burn-height-local", - burn_height_contract, + contract_name, + contract_code, ); - let txid = submit_tx_fallible(&http_origin, &contract_tx)?; - - wait_for(120, || { - let next_nonce = get_account(&http_origin, &sender_addr).nonce; - Ok(next_nonce > sender_nonce) - }) - .map(|()| txid) + submit_tx_fallible(&http_origin, &contract_tx).map(|resp| (resp, sender_nonce)) } - /// Submit a burn block dependent contract-call - /// and wait until it is included in a block - pub fn submit_burn_block_call_and_wait( + /// Submit a contract call and return (txid, sender_nonce) + pub fn submit_contract_call( &mut self, sender_sk: &StacksPrivateKey, - ) -> Result { + contract_name: &str, + contract_func: &str, + contract_args: &[Value], + ) -> Result<(String, u64), String> { let http_origin = format!("http://{}", &self.running_nodes.conf.node.rpc_bind); let sender_addr = to_addr(&sender_sk); let sender_nonce = get_account(&http_origin, &sender_addr).nonce; @@ -497,17 +491,54 @@ impl + Send + 'static, T: SignerEventTrait + 'static> SignerTest Result<(), String> { + let http_origin = format!("http://{}", &self.running_nodes.conf.node.rpc_bind); wait_for(120, || { let next_nonce = get_account(&http_origin, &sender_addr).nonce; Ok(next_nonce > sender_nonce) }) - .map(|()| txid) + } + + /// Submit a burn block dependent contract for publishing + /// and wait until it is included in a block + pub fn submit_burn_block_contract_and_wait( + &mut self, + sender_sk: &StacksPrivateKey, + ) -> Result { + let burn_height_contract = " + (define-data-var local-burn-block-ht uint u0) + (define-public (run-update) + (ok (var-set local-burn-block-ht burn-block-height))) + "; + let (txid, sender_nonce) = + self.submit_contract_deploy(sender_sk, burn_height_contract, "burn-height-local")?; + + self.wait_for_nonce_increase(&to_addr(&sender_sk), sender_nonce)?; + Ok(txid) + } + + /// Submit a burn block dependent contract-call + /// and wait until it is included in a block + pub fn submit_burn_block_call_and_wait( + &mut self, + sender_sk: &StacksPrivateKey, + ) -> Result { + let (txid, sender_nonce) = + self.submit_contract_call(sender_sk, "burn-height-local", "run-update", &[])?; + + self.wait_for_nonce_increase(&to_addr(&sender_sk), sender_nonce)?; + Ok(txid) } /// Get the local state machines and most recent peer info from the stacks-node, diff --git a/testnet/stacks-node/src/tests/signer/v0.rs b/testnet/stacks-node/src/tests/signer/v0.rs index 2a5fd3bfd5..87ddeb8871 100644 --- a/testnet/stacks-node/src/tests/signer/v0.rs +++ b/testnet/stacks-node/src/tests/signer/v0.rs @@ -80,7 +80,7 @@ use stacks_signer::client::{SignerSlotID, StackerDB}; use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network}; use stacks_signer::signerdb::SignerDb; use stacks_signer::v0::signer::TEST_REPEAT_PROPOSAL_RESPONSE; -use stacks_signer::v0::signer_state::SUPPORTED_SIGNER_PROTOCOL_VERSION; +use stacks_signer::v0::signer_state::{LocalStateMachine, SUPPORTED_SIGNER_PROTOCOL_VERSION}; use stacks_signer::v0::tests::{ TEST_IGNORE_ALL_BLOCK_PROPOSALS, TEST_PAUSE_BLOCK_BROADCAST, TEST_PIN_SUPPORTED_SIGNER_PROTOCOL_VERSION, TEST_REJECT_ALL_BLOCK_PROPOSAL, @@ -3036,6 +3036,349 @@ fn bitcoind_forking_test() { signer_test.shutdown(); } +#[test] +#[ignore] +/// Trigger a Bitcoin fork and ensure that the signer +/// both detects the fork and moves into a tx replay state +/// +/// The test flow is: +/// +/// - Mine 10 tenures after epoch 3 +/// - Include a STX transfer in the 10th tenure +/// - Trigger a Bitcoin fork (3 blocks) +/// - Verify that the signer moves into tx replay state +/// - Verify that the signer correctly includes the stx transfer +/// in the tx replay set +/// +/// Then, a second fork scenario is tested, which +/// includes multiple txs across multiple tenures. +fn tx_replay_forking_test() { + if env::var("BITCOIND_TEST") != Ok("1".into()) { + return; + } + + let num_signers = 5; + let sender_sk = Secp256k1PrivateKey::random(); + let sender_addr = tests::to_addr(&sender_sk); + let send_amt = 100; + let send_fee = 180; + let mut signer_test: SignerTest = SignerTest::new_with_config_modifications( + num_signers, + vec![(sender_addr, (send_amt + send_fee) * 10)], + |_| {}, + |node_config| { + node_config.miner.block_commit_delay = Duration::from_secs(1); + }, + None, + None, + ); + let conf = signer_test.running_nodes.conf.clone(); + let http_origin = format!("http://{}", &conf.node.rpc_bind); + + signer_test.boot_to_epoch_3(); + info!("------------------------- Reached Epoch 3.0 -------------------------"); + let pre_fork_tenures = 10; + + for i in 0..pre_fork_tenures { + info!("Mining pre-fork tenure {} of {pre_fork_tenures}", i + 1); + signer_test.mine_nakamoto_block(Duration::from_secs(30), true); + } + + signer_test.check_signer_states_normal(); + + let burn_blocks = test_observer::get_burn_blocks(); + let forked_blocks = burn_blocks.iter().rev().take(2).collect::>(); + let last_forked_tenure: ConsensusHash = hex_bytes( + &forked_blocks[0] + .get("consensus_hash") + .unwrap() + .as_str() + .unwrap()[2..], + ) + .unwrap() + .as_slice() + .into(); + let first_forked_tenure: ConsensusHash = hex_bytes( + &forked_blocks[1] + .get("consensus_hash") + .unwrap() + .as_str() + .unwrap()[2..], + ) + .unwrap() + .as_slice() + .into(); + + let tip = get_chain_info(&signer_test.running_nodes.conf); + // Make a transfer tx (this will get forked) + let (txid, _) = signer_test + .submit_transfer_tx(&sender_sk, send_fee, send_amt) + .unwrap(); + + wait_for(30, || { + let new_tip = get_chain_info(&signer_test.running_nodes.conf); + Ok(new_tip.stacks_tip_height > tip.stacks_tip_height) + }) + .expect("Timed out waiting for transfer tx to be mined"); + + let pre_fork_1_nonce = get_account(&http_origin, &sender_addr).nonce; + assert_eq!(pre_fork_1_nonce, 1); + + info!("------------------------- Triggering Bitcoin Fork -------------------------"); + + let burn_header_hash_to_fork = signer_test + .running_nodes + .btc_regtest_controller + .get_block_hash(tip.burn_block_height - 2); + signer_test + .running_nodes + .btc_regtest_controller + .invalidate_block(&burn_header_hash_to_fork); + signer_test + .running_nodes + .btc_regtest_controller + .build_next_block(3); + + // note, we should still have normal signer states! + signer_test.check_signer_states_normal(); + + info!("Wait for block off of shallow fork"); + + TEST_MINE_STALL.set(true); + + let submitted_commits = signer_test + .running_nodes + .counters + .naka_submitted_commits + .clone(); + + let fork_info = signer_test + .stacks_client + .get_tenure_forking_info(&first_forked_tenure, &last_forked_tenure) + .unwrap(); + + info!("---- Fork info: {fork_info:?} ----"); + + for fork in fork_info { + info!("---- Fork: {} ----", fork.consensus_hash); + fork.nakamoto_blocks.inspect(|blocks| { + for block in blocks { + info!("---- Block: {block:?} ----"); + } + }); + } + + // we need to mine some blocks to get back to being considered a frequent miner + for i in 0..3 { + let current_burn_height = get_chain_info(&signer_test.running_nodes.conf).burn_block_height; + info!( + "Mining block #{i} to be considered a frequent miner"; + "current_burn_height" => current_burn_height, + ); + let commits_count = submitted_commits.load(Ordering::SeqCst); + next_block_and_controller( + &mut signer_test.running_nodes.btc_regtest_controller, + 60, + |_btc_controller| { + let commits_submitted = submitted_commits.load(Ordering::SeqCst); + Ok(commits_submitted > commits_count) + }, + ) + .unwrap(); + } + + let post_fork_1_nonce = get_account(&http_origin, &sender_addr).nonce; + + let burn_blocks = test_observer::get_burn_blocks().clone(); + + for block in burn_blocks { + let height = block.get("burn_block_height").unwrap().as_number().unwrap(); + if height.as_u64().unwrap() < 230 { + continue; + } + let consensus_hash = block.get("consensus_hash").unwrap().as_str().unwrap(); + info!("---- Burn Block {height} {consensus_hash} ----"); + } + + let (signer_states, _) = signer_test.get_burn_updated_states(); + for state in signer_states { + match state { + LocalStateMachine::Initialized(signer_state_machine) => { + let Some(tx_replay_set) = signer_state_machine.tx_replay_set else { + panic!( + "Signer state machine is in tx replay state, but tx replay set is not set" + ); + }; + info!("---- Tx replay set: {:?} ----", tx_replay_set); + assert_eq!(tx_replay_set.len(), 1); + assert_eq!(tx_replay_set[0].txid().to_hex(), txid); + } + _ => { + panic!("Signer state is not in the initialized state"); + } + } + } + + // We should have forked 1 tx + assert_eq!(post_fork_1_nonce, pre_fork_1_nonce - 1); + + TEST_MINE_STALL.set(false); + + info!("---- Mining post-fork block to clear tx replay set ----"); + + // Now, make a new stacks block, which should clear the tx replay set + signer_test.mine_nakamoto_block(Duration::from_secs(30), true); + let (signer_states, _) = signer_test.get_burn_updated_states(); + for state in signer_states { + assert!( + state.get_tx_replay_set().is_none(), + "Signer state is in tx replay state, when it shouldn't be" + ); + } + + // Now, we'll trigger another fork, with more txs, across tenures + + // The forked blocks are: + // Tenure 1: + // - Block with stx transfer + // Tenure 2: + // - Block with contract deploy + // - Block with contract call + + signer_test.mine_nakamoto_block(Duration::from_secs(30), true); + + let pre_fork_2_tip = get_chain_info(&signer_test.running_nodes.conf); + + let contract_code = " + (define-public (call-fn) + (ok true) + ) + "; + let contract_name = "test-contract"; + + let (transfer_txid, transfer_nonce) = signer_test + .submit_transfer_tx(&sender_sk, send_fee, send_amt) + .expect("Failed to submit transfer tx"); + signer_test + .wait_for_nonce_increase(&sender_addr, transfer_nonce) + .expect("Failed to wait for nonce increase"); + signer_test.mine_nakamoto_block(Duration::from_secs(30), true); + + let (contract_deploy_txid, deploy_nonce) = signer_test + .submit_contract_deploy(&sender_sk, contract_code, contract_name) + .expect("Failed to submit contract deploy"); + signer_test + .wait_for_nonce_increase(&sender_addr, deploy_nonce) + .expect("Failed to wait for nonce increase"); + + let (contract_call_txid, contract_call_nonce) = signer_test + .submit_contract_call(&sender_sk, contract_name, "call-fn", &[]) + .expect("Failed to submit contract call"); + signer_test + .wait_for_nonce_increase(&sender_addr, contract_call_nonce) + .expect("Failed to wait for nonce increase"); + signer_test.mine_nakamoto_block(Duration::from_secs(30), true); + + TEST_MINE_STALL.set(true); + + let burn_header_hash_to_fork = signer_test + .running_nodes + .btc_regtest_controller + .get_block_hash(pre_fork_2_tip.burn_block_height); + signer_test + .running_nodes + .btc_regtest_controller + .invalidate_block(&burn_header_hash_to_fork); + signer_test + .running_nodes + .btc_regtest_controller + .build_next_block(3); + + let burn_blocks = test_observer::get_burn_blocks(); + let forked_blocks = burn_blocks.iter().rev().take(2).collect::>(); + let last_forked_tenure: ConsensusHash = hex_bytes( + &forked_blocks[0] + .get("consensus_hash") + .unwrap() + .as_str() + .unwrap()[2..], + ) + .unwrap() + .as_slice() + .into(); + let first_forked_tenure: ConsensusHash = hex_bytes( + &forked_blocks[1] + .get("consensus_hash") + .unwrap() + .as_str() + .unwrap()[2..], + ) + .unwrap() + .as_slice() + .into(); + + let fork_info = signer_test + .stacks_client + .get_tenure_forking_info(&first_forked_tenure, &last_forked_tenure) + .unwrap(); + + info!("---- Fork info: {fork_info:?} ----"); + + for fork in fork_info { + info!("---- Fork: {} ----", fork.consensus_hash); + fork.nakamoto_blocks.inspect(|blocks| { + for block in blocks { + info!("---- Block: {} ----", block.header.chain_length); + } + }); + } + + for i in 0..3 { + let current_burn_height = get_chain_info(&signer_test.running_nodes.conf).burn_block_height; + info!( + "Mining block #{i} to be considered a frequent miner"; + "current_burn_height" => current_burn_height, + ); + let commits_count = submitted_commits.load(Ordering::SeqCst); + next_block_and_controller( + &mut signer_test.running_nodes.btc_regtest_controller, + 60, + |_btc_controller| { + let commits_submitted = submitted_commits.load(Ordering::SeqCst); + Ok(commits_submitted > commits_count) + }, + ) + .unwrap(); + } + + let expected_tx_replay_txids = vec![transfer_txid, contract_deploy_txid, contract_call_txid]; + + let (signer_states, _) = signer_test.get_burn_updated_states(); + for state in signer_states { + match state { + LocalStateMachine::Initialized(signer_state_machine) => { + let Some(tx_replay_set) = signer_state_machine.tx_replay_set else { + panic!( + "Signer state machine is in tx replay state, but tx replay set is not set" + ); + }; + info!("---- Tx replay set: {:?} ----", tx_replay_set); + assert_eq!(tx_replay_set.len(), expected_tx_replay_txids.len()); + let state_replay_txids = tx_replay_set + .iter() + .map(|tx| tx.txid().to_hex()) + .collect::>(); + assert_eq!(state_replay_txids, expected_tx_replay_txids); + } + _ => { + panic!("Signer state is not in the initialized state"); + } + } + } + + signer_test.shutdown(); +} + #[test] #[ignore] fn multiple_miners() {