Skip to content

Feat/signer two phase commit #6082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE

### Added

- Added `SignerMessage::PreBlockCommit` for 2-phase commit block signing
- Added field `vm_error` to EventObserver transaction outputs
- Added new `ValidateRejectCode` values to the `/v3/block_proposal` endpoint
- Added `StateMachineUpdateContent::V1` to support a vector of `StacksTransaction` expected to be replayed in subsequent Stacks blocks
Expand Down
21 changes: 18 additions & 3 deletions libsigner/src/v0/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ MessageSlotID {
/// Block Response message from signers
BlockResponse = 1,
/// Signer State Machine Update
StateMachineUpdate = 2
StateMachineUpdate = 2,
/// Block Pre-commit message from signers before they commit to a block response
BlockPreCommit = 3
});

define_u8_enum!(
Expand Down Expand Up @@ -132,7 +134,9 @@ SignerMessageTypePrefix {
/// Mock block message from Epoch 2.5 miners
MockBlock = 5,
/// State machine update
StateMachineUpdate = 6
StateMachineUpdate = 6,
/// Block Pre-commit message
BlockPreCommit = 7
});

#[cfg_attr(test, mutants::skip)]
Expand All @@ -155,7 +159,7 @@ impl MessageSlotID {
#[cfg_attr(test, mutants::skip)]
impl Display for MessageSlotID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}({})", self, self.to_u8())
write!(f, "{self:?}({})", self.to_u8())
}
}

Expand All @@ -179,6 +183,7 @@ impl From<&SignerMessage> for SignerMessageTypePrefix {
SignerMessage::MockSignature(_) => SignerMessageTypePrefix::MockSignature,
SignerMessage::MockBlock(_) => SignerMessageTypePrefix::MockBlock,
SignerMessage::StateMachineUpdate(_) => SignerMessageTypePrefix::StateMachineUpdate,
SignerMessage::BlockPreCommit(_) => SignerMessageTypePrefix::BlockPreCommit,
}
}
}
Expand All @@ -200,6 +205,8 @@ pub enum SignerMessage {
MockBlock(MockBlock),
/// A state machine update
StateMachineUpdate(StateMachineUpdate),
/// The pre commit message from signers for other signers to observe
BlockPreCommit(Sha512Trunc256Sum),
}

impl SignerMessage {
Expand All @@ -215,6 +222,7 @@ impl SignerMessage {
| Self::MockBlock(_) => None,
Self::BlockResponse(_) | Self::MockSignature(_) => Some(MessageSlotID::BlockResponse), // Mock signature uses the same slot as block response since its exclusively for epoch 2.5 testing
Self::StateMachineUpdate(_) => Some(MessageSlotID::StateMachineUpdate),
Self::BlockPreCommit(_) => Some(MessageSlotID::BlockPreCommit),
}
}
}
Expand All @@ -234,6 +242,9 @@ impl StacksMessageCodec for SignerMessage {
SignerMessage::StateMachineUpdate(state_machine_update) => {
state_machine_update.consensus_serialize(fd)
}
SignerMessage::BlockPreCommit(signer_signature_hash) => {
signer_signature_hash.consensus_serialize(fd)
}
}?;
Ok(())
}
Expand Down Expand Up @@ -271,6 +282,10 @@ impl StacksMessageCodec for SignerMessage {
let state_machine_update = StacksMessageCodec::consensus_deserialize(fd)?;
SignerMessage::StateMachineUpdate(state_machine_update)
}
SignerMessageTypePrefix::BlockPreCommit => {
let signer_signature_hash = StacksMessageCodec::consensus_deserialize(fd)?;
SignerMessage::BlockPreCommit(signer_signature_hash)
}
};
Ok(message)
}
Expand Down
4 changes: 4 additions & 0 deletions stacks-signer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE

## [Unreleased]

### Added

- Added `SignerMessage::BlockPreCommit` message handling; signers now collect until a threshold is reached before issuing a block signature, implementing a proper 2-phase commit.

### Changed

- Upgraded `SUPPORTED_SIGNER_PROTOCOL_VERSION` to 1
Expand Down
8 changes: 8 additions & 0 deletions stacks-signer/src/monitoring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ pub mod actions {
BLOCK_RESPONSES_SENT.with_label_values(&[label_value]).inc();
}

/// Increment the block pre-commit sent counter
pub fn increment_block_pre_commits_sent() {
BLOCK_PRE_COMMITS_SENT.inc();
}

/// Increment the number of block proposals received
pub fn increment_block_proposals_received() {
BLOCK_PROPOSALS_RECEIVED.inc();
Expand Down Expand Up @@ -203,6 +208,9 @@ pub mod actions {
/// Increment the block responses sent counter
pub fn increment_block_responses_sent(_accepted: bool) {}

/// Increment the block pre-commits sent counter
pub fn increment_block_pre_commits_sent() {}

/// Increment the number of block proposals received
pub fn increment_block_proposals_received() {}

Expand Down
5 changes: 5 additions & 0 deletions stacks-signer/src/monitoring/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ lazy_static! {
&["response_type"]
)
.unwrap();
pub static ref BLOCK_PRE_COMMITS_SENT: IntCounter = register_int_counter!(opts!(
"stacks_signer_block_pre_commits_sent",
"The number of block pre-commits sent by the signer"
))
.unwrap();
pub static ref BLOCK_PROPOSALS_RECEIVED: IntCounter = register_int_counter!(opts!(
"stacks_signer_block_proposals_received",
"The number of block proposals received by the signer"
Expand Down
114 changes: 112 additions & 2 deletions stacks-signer/src/signerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,18 @@ CREATE TABLE IF NOT EXISTS signer_state_machine_updates (
PRIMARY KEY (signer_addr, reward_cycle)
) STRICT;"#;

static CREATE_BLOCK_PRE_COMMITS_TABLE: &str = r#"
CREATE TABLE IF NOT EXISTS block_pre_commits (
-- The block sighash commits to all of the stacks and burnchain state as of its parent,
-- as well as the tenure itself so there's no need to include the reward cycle. Just
-- the sighash is sufficient to uniquely identify the block across all burnchain, PoX,
-- and stacks forks.
signer_signature_hash TEXT NOT NULL,
-- signer address committing to sign the block
signer_addr TEXT NOT NULL,
PRIMARY KEY (signer_signature_hash, signer_addr)
) STRICT;"#;

static SCHEMA_1: &[&str] = &[
DROP_SCHEMA_0,
CREATE_DB_CONFIG,
Expand Down Expand Up @@ -613,9 +625,14 @@ static SCHEMA_11: &[&str] = &[
"INSERT INTO db_config (version) VALUES (11);",
];

static SCHEMA_12: &[&str] = &[
CREATE_BLOCK_PRE_COMMITS_TABLE,
"INSERT INTO db_config (version) VALUES (12);",
];

impl SignerDb {
/// The current schema version used in this build of the signer binary.
pub const SCHEMA_VERSION: u32 = 11;
pub const SCHEMA_VERSION: u32 = 12;

/// Create a new `SignerState` instance.
/// This will create a new SQLite database at the given path
Expand Down Expand Up @@ -799,6 +816,20 @@ impl SignerDb {
Ok(())
}

/// Migrate from schema 11 to schema 12
fn schema_12_migration(tx: &Transaction) -> Result<(), DBError> {
if Self::get_schema_version(tx)? >= 12 {
// no migration necessary
return Ok(());
}

for statement in SCHEMA_12.iter() {
tx.execute_batch(statement)?;
}

Ok(())
}

/// Register custom scalar functions used by the database
fn register_scalar_functions(&self) -> Result<(), DBError> {
// Register helper function for determining if a block is a tenure change transaction
Expand Down Expand Up @@ -843,7 +874,8 @@ impl SignerDb {
8 => Self::schema_9_migration(&sql_tx)?,
9 => Self::schema_10_migration(&sql_tx)?,
10 => Self::schema_11_migration(&sql_tx)?,
11 => break,
11 => Self::schema_12_migration(&sql_tx)?,
12 => break,
x => return Err(DBError::Other(format!(
"Database schema is newer than supported by this binary. Expected version = {}, Database version = {x}",
Self::SCHEMA_VERSION,
Expand Down Expand Up @@ -1428,6 +1460,39 @@ impl SignerDb {
None => Ok(0),
}
}

/// Record an observed block pre commit
pub fn add_block_pre_commit(
&self,
block_sighash: &Sha512Trunc256Sum,
address: &StacksAddress,
) -> Result<(), DBError> {
let qry = "INSERT OR REPLACE INTO block_pre_commits (signer_signature_hash, signer_addr) VALUES (?1, ?2);";
let args = params![block_sighash, address.to_string()];

debug!("Inserting block pre commit.";
"signer_signature_hash" => %block_sighash,
"signer_addr" => %address);

self.db.execute(qry, args)?;
Ok(())
}

/// Get all pre committers for a block
pub fn get_block_pre_committers(
&self,
block_sighash: &Sha512Trunc256Sum,
) -> Result<Vec<StacksAddress>, DBError> {
let qry = "SELECT signer_addr FROM block_pre_commits WHERE signer_signature_hash = ?1";
let args = params![block_sighash];
let addrs_txt: Vec<String> = query_rows(&self.db, qry, args)?;

let res: Result<Vec<_>, _> = addrs_txt
.into_iter()
.map(|addr| StacksAddress::from_string(&addr).ok_or(DBError::Corruption))
.collect();
res
}
}

fn try_deserialize<T>(s: Option<String>) -> Result<Option<T>, DBError>
Expand Down Expand Up @@ -2619,4 +2684,49 @@ pub mod tests {
"latency between updates should be 10 second"
);
}

#[test]
fn insert_and_get_state_block_pre_commits() {
let db_path = tmp_db_path();
let db = SignerDb::new(db_path).expect("Failed to create signer db");
let block_sighash1 = Sha512Trunc256Sum([1u8; 32]);
let address1 = StacksAddress::p2pkh(
false,
&StacksPublicKey::from_private(&StacksPrivateKey::random()),
);
let block_sighash2 = Sha512Trunc256Sum([2u8; 32]);
let address2 = StacksAddress::p2pkh(
false,
&StacksPublicKey::from_private(&StacksPrivateKey::random()),
);
let address3 = StacksAddress::p2pkh(
false,
&StacksPublicKey::from_private(&StacksPrivateKey::random()),
);
assert!(db
.get_block_pre_committers(&block_sighash1)
.unwrap()
.is_empty());

db.add_block_pre_commit(&block_sighash1, &address1).unwrap();
assert_eq!(
db.get_block_pre_committers(&block_sighash1).unwrap(),
vec![address1]
);

db.add_block_pre_commit(&block_sighash1, &address2).unwrap();
let commits = db.get_block_pre_committers(&block_sighash1).unwrap();
assert_eq!(commits.len(), 2);
assert!(commits.contains(&address2));
assert!(commits.contains(&address1));

db.add_block_pre_commit(&block_sighash2, &address3).unwrap();
let commits = db.get_block_pre_committers(&block_sighash1).unwrap();
assert_eq!(commits.len(), 2);
assert!(commits.contains(&address2));
assert!(commits.contains(&address1));
let commits = db.get_block_pre_committers(&block_sighash2).unwrap();
assert_eq!(commits.len(), 1);
assert!(commits.contains(&address3));
}
}
Loading
Loading