Skip to content

Commit

Permalink
[bridge] add responses metrics in bridge auth aggregator (#19877)
Browse files Browse the repository at this point in the history
## Description 

so we can monitor this in a dashboard.

## Test plan 

unit tests & test in production
---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
longbowlu committed Oct 18, 2024
1 parent 59e6a34 commit b636021
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 61 deletions.
9 changes: 7 additions & 2 deletions crates/sui-bridge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethers::types::Address as EthAddress;
use fastcrypto::encoding::{Encoding, Hex};
use shared_crypto::intent::Intent;
use shared_crypto::intent::IntentMessage;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::str::from_utf8;
use std::str::FromStr;
Expand Down Expand Up @@ -83,7 +84,7 @@ async fn main() -> anyhow::Result<()> {
let config = LoadedBridgeCliConfig::load(config).await?;
let metrics = Arc::new(BridgeMetrics::new_for_testing());
let sui_bridge_client =
SuiClient::<SuiSdkClient>::new(&config.sui_rpc_url, metrics).await?;
SuiClient::<SuiSdkClient>::new(&config.sui_rpc_url, metrics.clone()).await?;

let (sui_key, sui_address, gas_object_ref) = config
.get_sui_account_info()
Expand All @@ -99,7 +100,11 @@ async fn main() -> anyhow::Result<()> {
.await
.expect("Failed to get bridge committee"),
);
let agg = BridgeAuthorityAggregator::new(bridge_committee);
let agg = BridgeAuthorityAggregator::new(
bridge_committee,
metrics,
Arc::new(BTreeMap::new()),
);

// Handle Sui Side
if chain_id.is_sui_chain() {
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-bridge/src/action_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1513,9 +1513,9 @@ mod tests {

let committee = BridgeCommittee::new(authorities).unwrap();

let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
Arc::new(committee),
))));
let agg = Arc::new(ArcSwap::new(Arc::new(
BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
)));
let metrics = Arc::new(BridgeMetrics::new(&registry));
let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
let sui_token_type_tags = Arc::new(ArcSwap::new(Arc::new(sui_token_type_tags)));
Expand Down
133 changes: 107 additions & 26 deletions crates/sui-bridge/src/client/bridge_authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::client::bridge_client::BridgeClient;
use crate::crypto::BridgeAuthorityPublicKeyBytes;
use crate::crypto::BridgeAuthoritySignInfo;
use crate::error::{BridgeError, BridgeResult};
use crate::metrics::BridgeMetrics;
use crate::types::BridgeCommitteeValiditySignInfo;
use crate::types::{
BridgeAction, BridgeCommittee, CertifiedBridgeAction, VerifiedCertifiedBridgeAction,
Expand All @@ -30,10 +31,16 @@ const PREFETCH_TIMEOUT_MS: u64 = 1500;
pub struct BridgeAuthorityAggregator {
pub committee: Arc<BridgeCommittee>,
pub clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
pub metrics: Arc<BridgeMetrics>,
pub committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
}

impl BridgeAuthorityAggregator {
pub fn new(committee: Arc<BridgeCommittee>) -> Self {
pub fn new(
committee: Arc<BridgeCommittee>,
metrics: Arc<BridgeMetrics>,
committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
) -> Self {
let clients: BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>> = committee
.members()
.iter()
Expand Down Expand Up @@ -62,14 +69,30 @@ impl BridgeAuthorityAggregator {
Self {
committee,
clients: Arc::new(clients),
metrics,
committee_keys_to_names,
}
}

#[cfg(test)]
pub fn new_for_testing(committee: Arc<BridgeCommittee>) -> Self {
Self::new(
committee,
Arc::new(BridgeMetrics::new_for_testing()),
Arc::new(BTreeMap::new()),
)
}

pub async fn request_committee_signatures(
&self,
action: BridgeAction,
) -> BridgeResult<VerifiedCertifiedBridgeAction> {
let state = GetSigsState::new(action.approval_threshold(), self.committee.clone());
let state = GetSigsState::new(
action.approval_threshold(),
self.committee.clone(),
self.metrics.clone(),
self.committee_keys_to_names.clone(),
);
request_sign_bridge_action_into_certification(
action,
self.committee.clone(),
Expand All @@ -88,16 +111,25 @@ struct GetSigsState {
sigs: BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthoritySignInfo>,
validity_threshold: StakeUnit,
committee: Arc<BridgeCommittee>,
metrics: Arc<BridgeMetrics>,
committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
}

impl GetSigsState {
fn new(validity_threshold: StakeUnit, committee: Arc<BridgeCommittee>) -> Self {
fn new(
validity_threshold: StakeUnit,
committee: Arc<BridgeCommittee>,
metrics: Arc<BridgeMetrics>,
committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
) -> Self {
Self {
committee,
total_bad_stake: 0,
total_ok_stake: 0,
sigs: BTreeMap::new(),
validity_threshold,
metrics,
committee_keys_to_names,
}
}

Expand All @@ -119,7 +151,7 @@ impl GetSigsState {
match self.sigs.entry(name.clone()) {
Entry::Vacant(e) => {
e.insert(signed_action.auth_sig().clone());
self.total_ok_stake += stake;
self.add_ok_stake(stake, &name);
}
Entry::Occupied(_e) => {
return Err(BridgeError::AuthoritySignatureDuplication(format!(
Expand Down Expand Up @@ -156,7 +188,23 @@ impl GetSigsState {
}
}

fn add_bad_stake(&mut self, bad_stake: StakeUnit) {
fn add_ok_stake(&mut self, ok_stake: StakeUnit, name: &BridgeAuthorityPublicKeyBytes) {
if let Some(host_name) = self.committee_keys_to_names.get(name) {
self.metrics
.auth_agg_ok_responses
.with_label_values(&[host_name])
.inc();
}
self.total_ok_stake += ok_stake;
}

fn add_bad_stake(&mut self, bad_stake: StakeUnit, name: &BridgeAuthorityPublicKeyBytes) {
if let Some(host_name) = self.committee_keys_to_names.get(name) {
self.metrics
.auth_agg_bad_responses
.with_label_values(&[host_name])
.inc();
}
self.total_bad_stake += bad_stake;
}

Expand Down Expand Up @@ -223,7 +271,7 @@ async fn request_sign_bridge_action_into_certification(
name.concise(),
e
);
state.add_bad_stake(stake);
state.add_bad_stake(stake, &name);
}
}
}
Expand All @@ -233,7 +281,7 @@ async fn request_sign_bridge_action_into_certification(
name.concise(),
e
);
state.add_bad_stake(stake);
state.add_bad_stake(stake, &name);
}
};

Expand Down Expand Up @@ -296,7 +344,7 @@ mod tests {
}
let committee = BridgeCommittee::new(authorities.clone()).unwrap();

let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
assert_eq!(
agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
BTreeSet::from_iter(vec![
Expand All @@ -310,7 +358,7 @@ mod tests {
// authority 2 is blocklisted
authorities[2].is_blocklisted = true;
let committee = BridgeCommittee::new(authorities.clone()).unwrap();
let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
assert_eq!(
agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
BTreeSet::from_iter(vec![
Expand All @@ -323,7 +371,7 @@ mod tests {
// authority 3 has bad url
authorities[3].base_url = "".into();
let committee = BridgeCommittee::new(authorities.clone()).unwrap();
let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
assert_eq!(
agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
BTreeSet::from_iter(vec![
Expand Down Expand Up @@ -351,7 +399,7 @@ mod tests {

let committee = BridgeCommittee::new(authorities).unwrap();

let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));

let sui_tx_digest = TransactionDigest::random();
let sui_tx_event_index = 0;
Expand Down Expand Up @@ -468,7 +516,7 @@ mod tests {
let authorities_clone = authorities.clone();
let committee = Arc::new(BridgeCommittee::new(authorities_clone).unwrap());

let agg = BridgeAuthorityAggregator::new(committee.clone());
let agg = BridgeAuthorityAggregator::new_for_testing(committee.clone());

let sui_tx_digest = TransactionDigest::random();
let sui_tx_event_index = 0;
Expand Down Expand Up @@ -542,7 +590,13 @@ mod tests {

// we should receive all signatures in time, but only aggregate 2 authorities
// to achieve quorum
let state = GetSigsState::new(action.approval_threshold(), committee.clone());
let metrics = Arc::new(BridgeMetrics::new_for_testing());
let state = GetSigsState::new(
action.approval_threshold(),
committee.clone(),
metrics.clone(),
Arc::new(BTreeMap::new()),
);
let resp = request_sign_bridge_action_into_certification(
action.clone(),
agg.committee.clone(),
Expand All @@ -559,7 +613,12 @@ mod tests {

// we should receive all but the highest stake signatures in time, but still be able to
// achieve quorum with 3 sigs
let state = GetSigsState::new(action.approval_threshold(), committee.clone());
let state = GetSigsState::new(
action.approval_threshold(),
committee.clone(),
metrics.clone(),
Arc::new(BTreeMap::new()),
);
let resp = request_sign_bridge_action_into_certification(
action.clone(),
agg.committee.clone(),
Expand All @@ -576,7 +635,12 @@ mod tests {
assert!(!sig_keys.contains(&authorities[8].pubkey_bytes()));

// we should have fallen back to arrival order given that we timeout before we reach quorum
let state = GetSigsState::new(action.approval_threshold(), committee.clone());
let state = GetSigsState::new(
action.approval_threshold(),
committee.clone(),
metrics.clone(),
Arc::new(BTreeMap::new()),
);
let start = std::time::Instant::now();
let resp = request_sign_bridge_action_into_certification(
action.clone(),
Expand Down Expand Up @@ -625,7 +689,7 @@ mod tests {

let committee = BridgeCommittee::new(authorities.clone()).unwrap();

let agg = BridgeAuthorityAggregator::new(Arc::new(committee));
let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));

let sui_tx_digest = TransactionDigest::random();
let sui_tx_event_index = 0;
Expand Down Expand Up @@ -721,40 +785,52 @@ mod tests {
let committee = BridgeCommittee::new(authorities.clone()).unwrap();

let threshold = VALIDITY_THRESHOLD;
let mut state = GetSigsState::new(threshold, Arc::new(committee));
let metrics = Arc::new(BridgeMetrics::new_for_testing());
let mut state = GetSigsState::new(
threshold,
Arc::new(committee),
metrics.clone(),
Arc::new(BTreeMap::new()),
);

assert!(!state.is_too_many_error());

let dummy = authorities[0].pubkey_bytes();
// bad stake: 2500
state.add_bad_stake(2500);
state.add_bad_stake(2500, &dummy);
assert!(!state.is_too_many_error());

// bad stake ; 5000
state.add_bad_stake(2500);
state.add_bad_stake(2500, &dummy);
assert!(!state.is_too_many_error());

// bad stake : 6666
state.add_bad_stake(1666);
state.add_bad_stake(1666, &dummy);
assert!(!state.is_too_many_error());

// bad stake : 6667 - too many errors
state.add_bad_stake(1);
state.add_bad_stake(1, &dummy);
assert!(state.is_too_many_error());

// Authority 0 is blocklisted, we lose 2500 stake
authorities[0].is_blocklisted = true;
let committee = BridgeCommittee::new(authorities.clone()).unwrap();
let threshold = VALIDITY_THRESHOLD;
let mut state = GetSigsState::new(threshold, Arc::new(committee));
let metrics = Arc::new(BridgeMetrics::new_for_testing());
let mut state = GetSigsState::new(
threshold,
Arc::new(committee),
metrics.clone(),
Arc::new(BTreeMap::new()),
);

assert!(!state.is_too_many_error());

// bad stake: 2500 + 2500
state.add_bad_stake(2500);
state.add_bad_stake(2500, &dummy);
assert!(!state.is_too_many_error());

// bad stake: 5000 + 2500 - too many errors
state.add_bad_stake(2500);
state.add_bad_stake(2500, &dummy);
assert!(state.is_too_many_error());

// Below we test `handle_verified_signed_action`
Expand All @@ -764,7 +840,12 @@ mod tests {
authorities[3].is_blocklisted = true; // blocklist authority 3
let committee = BridgeCommittee::new(authorities.clone()).unwrap();
let threshold = VALIDITY_THRESHOLD;
let mut state = GetSigsState::new(threshold, Arc::new(committee.clone()));
let mut state = GetSigsState::new(
threshold,
Arc::new(committee.clone()),
metrics.clone(),
Arc::new(BTreeMap::new()),
);

let sui_tx_digest = TransactionDigest::random();
let sui_tx_event_index = 0;
Expand Down
3 changes: 1 addition & 2 deletions crates/sui-bridge/src/e2e_tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ async fn test_add_new_coins_on_sui_and_eth() {
.with_num_validators(3)
.build()
.await;

let bridge_arg = bridge_test_cluster.get_mut_bridge_arg().await.unwrap();

// Register tokens on Sui
Expand Down Expand Up @@ -239,7 +238,7 @@ async fn test_add_new_coins_on_sui_and_eth() {
.await
.expect("Failed to get bridge committee"),
);
let agg = BridgeAuthorityAggregator::new(bridge_committee);
let agg = BridgeAuthorityAggregator::new_for_testing(bridge_committee);
let certified_sui_action = agg
.request_committee_signatures(sui_action)
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-bridge/src/e2e_tests/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_sui_bridge_paused() {

// get pause bridge signatures from committee
let bridge_committee = Arc::new(bridge_client.get_bridge_committee().await.unwrap());
let agg = BridgeAuthorityAggregator::new(bridge_committee);
let agg = BridgeAuthorityAggregator::new_for_testing(bridge_committee);
let certified_action = agg
.request_committee_signatures(pause_action)
.await
Expand Down
Loading

0 comments on commit b636021

Please sign in to comment.