diff --git a/Cargo.lock b/Cargo.lock index cc91fd199..9ea41082a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,6 +831,7 @@ dependencies = [ name = "alpen-express-primitives" version = "0.1.0" dependencies = [ + "alpen-test-utils", "arbitrary", "bitcoin", "borsh", @@ -838,6 +839,7 @@ dependencies = [ "hex", "rand 0.8.5", "reth-primitives", + "secp256k1", "serde", "serde_json", "sha2 0.10.8", @@ -913,6 +915,7 @@ dependencies = [ "hex", "serde", "serde_json", + "serde_with", ] [[package]] diff --git a/crates/db/src/interfaces/bridge_relay.rs b/crates/db/src/interfaces/bridge_relay.rs new file mode 100644 index 000000000..5db599a56 --- /dev/null +++ b/crates/db/src/interfaces/bridge_relay.rs @@ -0,0 +1,16 @@ +use alpen_express_primitives::relay::types::BridgeMessage; + +use crate::DbResult; + +/// Interface for storing and retrieving bridge messages. +#[cfg_attr(feature = "mocks", automock)] +pub trait BridgeMessageDb { + /// Stores a bridge message + fn write_msg(&self, id: u128, msg: BridgeMessage) -> DbResult<()>; + + /// Deletes messages by their UNIX epoch. + fn delete_msgs_before_timestamp(&self, msg_ids: u128) -> DbResult<()>; + + /// Retrieves messages by their scope. + fn get_msgs_by_scope(&self, scope: &[u8]) -> DbResult>; +} diff --git a/crates/db/src/interfaces/mod.rs b/crates/db/src/interfaces/mod.rs new file mode 100644 index 000000000..a18adb4aa --- /dev/null +++ b/crates/db/src/interfaces/mod.rs @@ -0,0 +1 @@ +pub mod bridge_relay; diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index ac1f68ee7..ac1918896 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -3,6 +3,7 @@ pub mod bridge; pub mod database; pub mod errors; +pub mod interfaces; pub mod traits; pub mod types; diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index c7c80e2f0..1b9ff9baa 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -9,11 +9,18 @@ bitcoin = { workspace = true } borsh = { workspace = true } digest = { workspace = true } hex = { workspace = true } +rand = { workspace = true, optional = true } reth-primitives = { workspace = true } +secp256k1 = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } thiserror = { workspace = true } [dev-dependencies] -rand = { workspace = true } +alpen-test-utils = { workspace = true } + +[features] +default = ["std", "rand"] +std = ["dep:secp256k1"] +rand = ["std", "dep:rand"] diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 102e749bf..b3f6c373d 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -1,6 +1,5 @@ //! Collection of generic internal data types that are used widely. -// TODO import hash types, routines // TODO import address types // TODO import generic account types @@ -11,6 +10,8 @@ pub mod errors; pub mod evm_exec; pub mod hash; pub mod l1; +pub mod operator; pub mod params; pub mod prelude; +pub mod relay; pub mod utils; diff --git a/crates/primitives/src/operator.rs b/crates/primitives/src/operator.rs new file mode 100644 index 000000000..fe82f2374 --- /dev/null +++ b/crates/primitives/src/operator.rs @@ -0,0 +1,30 @@ +use super::bridge::OperatorIdx; +use crate::prelude::Buf32; + +/// Some type that can provide operator keys. +pub trait OperatorKeyProvider { + /// Returns the operator's signing pubkey, if it exists in the table. + fn get_operator_signing_pk(&self, idx: OperatorIdx) -> Option; +} + +/// Stub key provider that can be used for testing. +pub struct StubOpKeyProv { + expected_idx: OperatorIdx, + pk: Buf32, +} + +impl StubOpKeyProv { + pub fn new(expected_idx: OperatorIdx, pk: Buf32) -> Self { + Self { expected_idx, pk } + } +} + +impl OperatorKeyProvider for StubOpKeyProv { + fn get_operator_signing_pk(&self, idx: OperatorIdx) -> Option { + if idx == self.expected_idx { + Some(self.pk) + } else { + None + } + } +} diff --git a/crates/primitives/src/relay/mod.rs b/crates/primitives/src/relay/mod.rs new file mode 100644 index 000000000..13dc47246 --- /dev/null +++ b/crates/primitives/src/relay/mod.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod util; diff --git a/crates/primitives/src/relay/types.rs b/crates/primitives/src/relay/types.rs new file mode 100644 index 000000000..8632c886a --- /dev/null +++ b/crates/primitives/src/relay/types.rs @@ -0,0 +1,192 @@ +#![allow(dead_code)] + +use core::fmt; + +use arbitrary::Arbitrary; +use borsh::{io, BorshDeserialize, BorshSerialize}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; + +use crate::buf::{Buf32, Buf64}; + +/// Message container used to direct payloads depending on the context between parties. +#[derive(Clone, Debug, Eq, PartialEq, BorshDeserialize, BorshSerialize, Deserialize, Serialize)] +pub struct BridgeMessage { + /// Operator ID + pub(crate) source_id: u32, + + /// Schnorr signature of the message + pub(crate) sig: Buf64, + + /// Purpose of the message. + pub(crate) scope: Vec, + + /// serialized message + pub(crate) payload: Vec, +} + +impl<'a> Arbitrary<'a> for BridgeMessage { + fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { + let source_id = u32::arbitrary(u)?; + let sig = Buf64::arbitrary(u)?; + let scope = borsh::to_vec(&Scope::Misc).unwrap(); + let mut payload = vec![0; 20]; + u.fill_buffer(&mut payload)?; + + Ok(Self { + source_id, + sig, + scope, + payload, + }) + } +} + +impl BridgeMessage { + /// Source ID. + pub fn source_id(&self) -> u32 { + self.source_id + } + + /// Signature. + pub fn signature(&self) -> &Buf64 { + &self.sig + } + + /// Raw scope. + pub fn scope(&self) -> &[u8] { + &self.scope + } + + /// Raw payload + pub fn payload(&self) -> &[u8] { + &self.payload + } + + /// Tries to parse the scope buf as a typed scope. + pub fn try_parse_scope(&self) -> Option { + Scope::try_from_slice(self.scope()).ok() + } + + /// Computes a msg ID based on the . + pub fn compute_id(&self) -> BridgeMsgId { + // No signature because it might be malleable and it doesn't have any + // useful data in it we'd want to inspect. + let mut digest = Sha256::default(); + digest.update(&self.source_id.to_be_bytes()); + digest.update(&(self.scope.len() as u64).to_be_bytes()); + digest.update(&self.scope); + digest.update(&(self.payload.len() as u64).to_be_bytes()); + digest.update(&self.payload); + + let hash: [u8; 32] = digest.finalize().into(); + BridgeMsgId::from(Buf32::from(hash)) + } +} + +/// Scope of the [`BridgeMessage`] +#[derive(Clone, Debug, Eq, PartialEq, BorshDeserialize, BorshSerialize, Deserialize, Serialize)] +pub enum Scope { + /// Used for debugging purposes. + Misc, + + /// Deposit Signature with Outpoint. + // TODO make this contain the outpoint + V0DepositSig(u32), + + /// Withdrawal Signature with Deposit index. + V0WithdrawalSig(u32), +} + +impl Scope { + /// Tries to parse the scope from a slice. + pub fn try_from_slice(raw: &[u8]) -> Result { + Ok(borsh::from_slice(raw)?) + } +} + +/// ID of a [``BridgeMessage``] computed from the sender ID, scope, and payload. +#[derive( + Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Arbitrary, BorshDeserialize, BorshSerialize, +)] +pub struct BridgeMsgId(Buf32); + +impl BridgeMsgId { + pub fn inner(&self) -> &Buf32 { + &self.0 + } + + pub fn into_inner(self) -> Buf32 { + self.0 + } +} + +impl From for BridgeMsgId { + fn from(value: Buf32) -> Self { + Self(value) + } +} + +impl From for Buf32 { + fn from(value: BridgeMsgId) -> Self { + value.0 + } +} + +impl AsRef<[u8; 32]> for BridgeMsgId { + fn as_ref(&self) -> &[u8; 32] { + self.0.as_ref() + } +} + +impl fmt::Debug for BridgeMsgId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + +impl fmt::Display for BridgeMsgId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +#[derive(Copy, Clone, Deserialize, Debug)] +pub struct RelayerConfig { + /// Time we check for purgeable messages. + pub refresh_interval: u64, + + /// Age after which we'll start to re-relay a message if we recv it again. + pub stale_duration: u64, + + /// Relay misc messages that don't check signatures. + pub relay_misc: bool, +} + +#[cfg(test)] +mod tests { + use alpen_test_utils::ArbitraryGenerator; + + use super::{BridgeMessage, Scope}; + use crate::buf::Buf64; + + fn get_arb_bridge_msg() -> BridgeMessage { + let msg: BridgeMessage = ArbitraryGenerator::new().generate(); + msg + } + + fn make_bridge_msg() -> BridgeMessage { + BridgeMessage { + source_id: 1, + sig: Buf64::from([0; 64]), + scope: borsh::to_vec(&Scope::Misc).unwrap(), + payload: vec![1, 2, 3, 4, 5], + } + } + + #[test] + fn test_get_scope_raw() { + let msg = make_bridge_msg(); + assert_eq!(msg.scope(), vec![0]) + } +} diff --git a/crates/primitives/src/relay/util.rs b/crates/primitives/src/relay/util.rs new file mode 100644 index 000000000..145a7d0d7 --- /dev/null +++ b/crates/primitives/src/relay/util.rs @@ -0,0 +1,179 @@ +use std::sync::Arc; + +use rand::rngs::OsRng; +use secp256k1::{schnorr::Signature, All, Keypair, Message, Secp256k1, SecretKey, XOnlyPublicKey}; +use thiserror::Error; + +use super::types::{BridgeMessage, Scope}; +use crate::{ + buf::{Buf32, Buf64}, + operator::OperatorKeyProvider, +}; + +/// Contains data needed to construct bridge messages. +#[derive(Clone)] +pub struct MessageSigner { + operator_idx: u32, + msg_signing_sk: Buf32, + secp: Arc>, +} + +impl MessageSigner { + pub fn new(operator_idx: u32, msg_signing_sk: Buf32, secp: Arc>) -> Self { + Self { + operator_idx, + msg_signing_sk, + secp, + } + } + + /// Gets the idx of the operator that we are using for signing messages. + pub fn operator_idx(&self) -> u32 { + self.operator_idx + } + + /// Gets the pubkey corresponding to the internal msg signing sk. + pub fn get_pubkey(&self) -> Buf32 { + compute_pubkey_for_privkey(&self.msg_signing_sk, self.secp.as_ref()) + } + + /// Signs a message using a raw scope and payload. + pub fn sign_raw(&self, scope: Vec, payload: Vec) -> BridgeMessage { + let mut tmp_m = BridgeMessage { + source_id: self.operator_idx, + sig: Buf64::zero(), + scope, + payload, + }; + + let id: Buf32 = tmp_m.compute_id().into(); + let sig = sign_msg_hash(&self.msg_signing_sk, &id, self.secp.as_ref()); + tmp_m.sig = sig; + + tmp_m + } + + /// Signs a message with some particular typed scope. + pub fn sign_scope(&self, scope: &Scope, payload: Vec) -> BridgeMessage { + let raw_scope = borsh::to_vec(scope).unwrap(); + self.sign_raw(raw_scope, payload) + } +} + +/// Computes the corresponding x-only pubkey as a buf32 for an sk. +#[cfg(feature = "std")] +pub fn compute_pubkey_for_privkey(sk: &Buf32, secp: &Secp256k1) -> Buf32 { + let kp = Keypair::from_seckey_slice(secp, sk.as_ref()).unwrap(); + let (xonly_pk, _) = kp.public_key().x_only_public_key(); + Buf32::from(xonly_pk.serialize()) +} + +/// Generates a signature for the message. +#[cfg(all(feature = "std", feature = "rand"))] +pub fn sign_msg_hash( + sk: &Buf32, + msg_hash: &Buf32, + secp: &Secp256k1, +) -> Buf64 { + let mut rng = OsRng; + + let keypair = Keypair::from_secret_key(secp, &SecretKey::from_slice(sk.as_ref()).unwrap()); + let msg = Message::from_digest(*msg_hash.as_ref()); + let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut rng); + + Buf64::from(*sig.as_ref()) +} + +/// Returns if the signature is correct for the message. +#[cfg(feature = "std")] +pub fn verify_sig(pk: &Buf32, msg_hash: &Buf32, sig: &Buf64) -> bool { + let pk = XOnlyPublicKey::from_slice(pk.as_ref()).unwrap(); + let msg = Message::from_digest(*msg_hash.as_ref()); + let sig = Signature::from_slice(sig.as_ref()).unwrap(); + + sig.verify(&msg, &pk).is_ok() +} + +#[derive(Debug, Error)] +pub enum VerifyError { + #[error("invalid signature")] + InvalidSig, + + #[error("unknown operator idx")] + UnknownOperator, +} + +/// Verifies a bridge message using the operator table working with and checks +/// if the operator exists and verifies the signature using their pubkeys. +pub fn verify_bridge_msg_sig( + msg: &BridgeMessage, + optbl: &impl OperatorKeyProvider, +) -> Result<(), VerifyError> { + let op_signing_pk = optbl + .get_operator_signing_pk(msg.source_id()) + .ok_or(VerifyError::UnknownOperator)?; + + let msg_hash = msg.compute_id().into_inner(); + if !verify_sig(&op_signing_pk, &msg_hash, msg.signature()) { + return Err(VerifyError::InvalidSig); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{buf::Buf32, operator::StubOpKeyProv, relay::types::*}; + + #[test] + fn test_sign_verify_raw() { + let secp = secp256k1::Secp256k1::new(); + + let msg_hash = [ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, + 25, 26, 27, 28, 29, 30, 31, 32, + ]; + let msg_hash = Buf32::from(msg_hash); + let sk = Buf32::from([3; 32]); + let pk = compute_pubkey_for_privkey(&sk, &secp); + + let sig = sign_msg_hash(&sk, &msg_hash, &secp); + assert!(verify_sig(&pk, &msg_hash, &sig)); + } + + #[test] + fn test_sign_verify_msg_ok() { + let secp = Arc::new(Secp256k1::new()); + let sk = Buf32::from([1; 32]); + + let idx = 4; + let signer = MessageSigner::new(idx, sk, secp); + let pk = signer.get_pubkey(); + + let payload = vec![1, 2, 3, 4, 5]; + let m = signer.sign_scope(&Scope::Misc, payload); + + let stub_prov = StubOpKeyProv::new(idx, pk); + assert!(verify_bridge_msg_sig(&m, &stub_prov).is_ok()); + } + + #[test] + fn test_sign_verify_msg_fail() { + let secp = Arc::new(Secp256k1::new()); + let sk = Buf32::from([1; 32]); + + let idx = 4; + let signer = MessageSigner::new(idx, sk, secp); + let pk = signer.get_pubkey(); + + let payload = vec![1, 2, 3, 4, 5]; + let mut m = signer.sign_scope(&Scope::Misc, payload); + m.sig = Buf64::zero(); + + let stub_prov = StubOpKeyProv::new(idx, pk); + assert!(verify_bridge_msg_sig(&m, &stub_prov).is_err()); + } + + // TODO add verify fail check +} diff --git a/crates/primitives/src/utils.rs b/crates/primitives/src/utils.rs index b53b21cad..3acca9751 100644 --- a/crates/primitives/src/utils.rs +++ b/crates/primitives/src/utils.rs @@ -3,6 +3,7 @@ use bitcoin::{ hashes::{sha256d, Hash}, Block, Wtxid, }; +use serde::{Deserialize, Serialize}; use crate::{ l1::{L1Tx, L1TxProof}, @@ -106,6 +107,49 @@ pub fn generate_l1_tx(idx: u32, block: &Block) -> L1Tx { L1Tx::new(proof, tx) } +/// Temporary schnorr keypair. +// FIXME why temporary? +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct SchnorrKeypair { + /// Secret key. + pub sk: Buf32, + + /// Public key. + pub pk: Buf32, +} + +/// Get the temporary schnorr keypairs for testing purpose +/// These are generated randomly and added here just for functional tests till we don't have proper +/// genesis configuration plus operator addition mechanism ready +// FIXME remove +pub fn get_test_schnorr_keys() -> [SchnorrKeypair; 2] { + let sk1 = Buf32::from([ + 155, 178, 84, 107, 54, 0, 197, 195, 174, 240, 129, 191, 24, 173, 144, 52, 153, 57, 41, 184, + 222, 115, 62, 245, 106, 42, 26, 164, 241, 93, 63, 148, + ]); + + let sk2 = Buf32::from([ + 1, 192, 58, 188, 113, 238, 155, 119, 2, 231, 5, 226, 190, 131, 111, 184, 17, 104, 35, 133, + 112, 56, 145, 93, 55, 28, 70, 211, 190, 189, 33, 76, + ]); + + let pk1 = Buf32::from([ + 200, 254, 220, 180, 229, 125, 231, 84, 201, 194, 33, 54, 218, 238, 223, 231, 31, 17, 65, 8, + 94, 1, 2, 140, 184, 91, 193, 237, 28, 80, 34, 141, + ]); + + let pk2 = Buf32::from([ + 0xfa, 0x78, 0x77, 0x2d, 0x6a, 0x9a, 0xb0, 0x1a, 0x61, 0x0a, 0xb8, 0xf2, 0xfd, 0xb9, 0x01, + 0xba, 0xf3, 0x0a, 0xb2, 0x09, 0x3e, 0x53, 0xff, 0xc3, 0x1c, 0xc2, 0x81, 0xee, 0x07, 0x07, + 0x9f, 0x92, + ]); + + [ + SchnorrKeypair { sk: sk1, pk: pk1 }, + SchnorrKeypair { sk: sk2, pk: pk2 }, + ] +} + #[cfg(test)] mod tests { use bitcoin::consensus::deserialize; diff --git a/crates/rocksdb-store/src/bridge_relay/db.rs b/crates/rocksdb-store/src/bridge_relay/db.rs new file mode 100644 index 000000000..53275352c --- /dev/null +++ b/crates/rocksdb-store/src/bridge_relay/db.rs @@ -0,0 +1,215 @@ +#![allow(unused)] + +use std::sync::Arc; + +use alpen_express_db::{interfaces::bridge_relay::BridgeMessageDb, DbError, DbResult}; +use alpen_express_primitives::relay::types::{BridgeMessage, Scope}; +use rockbound::{ + utils::get_last, OptimisticTransactionDB as DB, SchemaBatch, SchemaDBOperationsExt, + TransactionRetry, +}; + +use super::schemas::{BridgeMsgIdSchema, ScopeMsgIdSchema}; +use crate::DbOpsConfig; + +pub struct BridgeMsgDb { + db: Arc, + ops: DbOpsConfig, +} + +impl BridgeMsgDb { + pub fn new(db: Arc, ops: DbOpsConfig) -> Self { + Self { db, ops } + } + + fn get_msg_ids_before_timestamp(&self, msg_id: u128) -> DbResult> { + // reverse and then place a iterator here + let mut iterator = self.db.iter::()?; + iterator.seek_to_first(); + + let mut ids = Vec::new(); + for res in iterator { + let (timestamp, _) = res?.into_tuple(); + if timestamp <= msg_id { + ids.push(timestamp); + } + } + + Ok(ids) + } +} + +impl BridgeMessageDb for BridgeMsgDb { + fn write_msg(&self, id: u128, msg: BridgeMessage) -> alpen_express_db::DbResult<()> { + let mut id = id; + while self.db.get::(&id)?.is_some() { + id += 1; + } + + self.db.put::(&id, &msg); + + let scope = msg.scope().to_owned(); + + if let Some(scopes) = self.db.get::(&scope)? { + let mut new_scopes = Vec::new(); + new_scopes.extend(&scopes); + new_scopes.push(id); + self.db.put::(&scope, &new_scopes)?; + } else { + self.db.put::(&scope, &vec![id])?; + } + + Ok(()) + } + + fn delete_msgs_before_timestamp(&self, msg_id: u128) -> DbResult<()> { + let ids = self.get_msg_ids_before_timestamp(msg_id)?; + + let mut batch = SchemaBatch::new(); + for id in ids { + batch.delete::(&id)?; + } + + self.db.write_schemas(batch)?; + Ok(()) + } + + fn get_msgs_by_scope(&self, scope: &[u8]) -> DbResult> { + // Regular loop for filtering and mapping + let Some(msg_ids) = self.db.get(&scope.to_owned())? else { + return Ok(Vec::new()); + }; + + let mut msgs = Vec::new(); + + // Iterating over filtered message IDs to fetch messages + for id in msg_ids { + let Some(message) = self.db.get::(&id)? else { + continue; + }; + + msgs.push(message); + } + + Ok(msgs) + } +} + +#[cfg(feature = "test_utils")] +#[cfg(test)] +mod tests { + use std::time::{SystemTime, UNIX_EPOCH}; + + use alpen_express_primitives::{l1::L1TxProof, relay::types::BridgeMessage}; + use alpen_test_utils::ArbitraryGenerator; + + use super::*; + use crate::test_utils::get_rocksdb_tmp_instance; + + fn setup_db() -> BridgeMsgDb { + let (db, db_ops) = get_rocksdb_tmp_instance().unwrap(); + BridgeMsgDb::new(db, db_ops) + } + + fn make_bridge_msg() -> (u128, BridgeMessage) { + let arb = ArbitraryGenerator::new(); + + let msg: BridgeMessage = arb.generate(); + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros(); + + (timestamp, msg) + } + + #[test] + fn test_write_msgs() { + let br_db = setup_db(); + let (timestamp, msg) = make_bridge_msg(); + + let result = br_db.write_msg(timestamp, msg); + assert!(result.is_ok()); + } + + #[test] + fn test_get_msg_ids_before_timestamp() { + let br_db = setup_db(); + let (timestamp1, msg1) = make_bridge_msg(); + let (timestamp2, _) = make_bridge_msg(); + let (timestamp3, msg2) = make_bridge_msg(); + + // Write messages to the database + br_db.write_msg(timestamp1, msg1).unwrap(); + br_db.write_msg(timestamp3, msg2).unwrap(); + + // Retrieve message IDs before the second timestamp + let result = br_db.get_msg_ids_before_timestamp(timestamp2); + assert!(result.is_ok()); + + let ids = result.unwrap(); + assert!(ids.contains(×tamp1)); + } + + #[test] + fn test_delete_msgs_before_timestamp() { + let br_db = setup_db(); + let (timestamp1, msg1) = make_bridge_msg(); + let (timestamp2, msg2) = make_bridge_msg(); + + // Write messages to the database + br_db.write_msg(timestamp1, msg1).unwrap(); + br_db.write_msg(timestamp2, msg2).unwrap(); + // Delete messages before the second timestamp + let result = br_db.delete_msgs_before_timestamp(timestamp2); + assert!(result.is_ok()); + + // Check if only the second message remains + let ids = br_db.get_msg_ids_before_timestamp(u128::MAX).unwrap(); + assert!(!ids.contains(×tamp1)); + } + + #[test] + fn test_get_msgs_by_scope() { + let br_db = setup_db(); + let (timestamp1, mut msg1) = make_bridge_msg(); + let (timestamp2, mut msg2) = make_bridge_msg(); + + // Write messages to the database + br_db.write_msg(timestamp1, msg1.clone()).unwrap(); + br_db.write_msg(timestamp2, msg2.clone()).unwrap(); + + // Retrieve messages by scope + let result = br_db.get_msgs_by_scope(msg1.scope()); + assert!(result.is_ok()); + + assert!(!result.unwrap().is_empty()); + } + + #[test] + fn test_no_messages_for_nonexistent_scope() { + let br_db = setup_db(); + let (timestamp, msg) = make_bridge_msg(); + let scope = msg.scope().to_vec(); + + // Write message to the database + br_db + .write_msg(timestamp, msg) + .expect("test: insert bridge msg"); + + // Try to retrieve messages with a different scope + let result = br_db + .get_msgs_by_scope(&[42]) + .expect("test: fetch bridge msg"); + assert!(result.is_empty()); + + // Try to retrieve messages with a different scope + let result = br_db + .get_msgs_by_scope(&scope) + .expect("test: fetch bridge msg"); + + // Should not be empty since we're using the scope of the message we put in. + assert!(!result.is_empty()); + } +} diff --git a/crates/rocksdb-store/src/bridge_relay/mod.rs b/crates/rocksdb-store/src/bridge_relay/mod.rs new file mode 100644 index 000000000..29bfe2f8a --- /dev/null +++ b/crates/rocksdb-store/src/bridge_relay/mod.rs @@ -0,0 +1,2 @@ +pub mod db; +pub mod schemas; diff --git a/crates/rocksdb-store/src/bridge_relay/schemas.rs b/crates/rocksdb-store/src/bridge_relay/schemas.rs new file mode 100644 index 000000000..da8bdba58 --- /dev/null +++ b/crates/rocksdb-store/src/bridge_relay/schemas.rs @@ -0,0 +1,16 @@ +use alpen_express_primitives::relay::types::BridgeMessage; + +use crate::{ + define_table_with_default_codec, define_table_with_seek_key_codec, define_table_without_codec, + impl_borsh_value_codec, +}; + +define_table_with_seek_key_codec!( + /// A table to store mapping of Unix epoch to Bridge Message + (BridgeMsgIdSchema) u128 => BridgeMessage +); + +define_table_with_default_codec!( + /// A table to store mapping of scope to Bridge Message Ids + (ScopeMsgIdSchema) Vec => Vec +); diff --git a/crates/rocksdb-store/src/lib.rs b/crates/rocksdb-store/src/lib.rs index 9c58ef4a0..8455893cc 100644 --- a/crates/rocksdb-store/src/lib.rs +++ b/crates/rocksdb-store/src/lib.rs @@ -1,14 +1,17 @@ +pub mod bridge_relay; pub mod broadcaster; pub mod chain_state; pub mod client_state; pub mod l1; pub mod l2; -pub mod macros; pub mod sequencer; pub mod sync_event; + +pub mod macros; +pub mod utils; + #[cfg(feature = "test_utils")] pub mod test_utils; -pub mod utils; pub const ROCKSDB_NAME: &str = "express"; @@ -31,10 +34,15 @@ pub const STORE_COLUMN_FAMILIES: &[ColumnFamilyName] = &[ // Bcast schemas BcastL1TxIdSchema::COLUMN_FAMILY_NAME, BcastL1TxSchema::COLUMN_FAMILY_NAME, + // Bridge relay schemas + BridgeMsgIdSchema::COLUMN_FAMILY_NAME, + ScopeMsgIdSchema::COLUMN_FAMILY_NAME, // TODO add col families for other store types ]; // Re-exports +pub use bridge_relay::db::BridgeMsgDb; +use bridge_relay::schemas::*; pub use broadcaster::db::BroadcastDb; use broadcaster::schemas::{BcastL1TxIdSchema, BcastL1TxSchema}; pub use chain_state::db::ChainStateDb; diff --git a/crates/rpc/api/src/lib.rs b/crates/rpc/api/src/lib.rs index dea94bb64..51af3aa6f 100644 --- a/crates/rpc/api/src/lib.rs +++ b/crates/rpc/api/src/lib.rs @@ -66,6 +66,14 @@ pub trait AlpenApi { #[method(name = "getRawBundleById")] async fn get_raw_bundle_by_id(&self, block_id: L2BlockId) -> RpcResult>; + + /// Get message by scope, Currently either Deposit or Withdrawal + #[method(name = "getBridgeMsgsByScope")] + async fn get_msgs_by_scope(&self, scope: HexBytes) -> RpcResult>; + + /// Submit raw messages + #[method(name = "submitBridgeMsg")] + async fn submit_bridge_msg(&self, raw_msg: HexBytes) -> RpcResult<()>; } #[cfg_attr(not(feature = "client"), rpc(server, namespace = "alpadmin"))] diff --git a/crates/rpc/api/src/types.rs b/crates/rpc/api/src/types.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/rpc/types/Cargo.toml b/crates/rpc/types/Cargo.toml index 064688082..dcfca9f24 100644 --- a/crates/rpc/types/Cargo.toml +++ b/crates/rpc/types/Cargo.toml @@ -8,4 +8,5 @@ alpen-express-state = { workspace = true } bitcoin = { workspace = true } hex = { workspace = true } serde = { workspace = true } +serde_with = { workspace = true } serde_json = { workspace = true } diff --git a/crates/state/src/bridge_state.rs b/crates/state/src/bridge_state.rs index 8b821e563..821f64be3 100644 --- a/crates/state/src/bridge_state.rs +++ b/crates/state/src/bridge_state.rs @@ -7,6 +7,7 @@ use alpen_express_primitives::{ bridge::OperatorIdx, buf::Buf32, l1::{self, BitcoinAmount, OutputRef, XOnlyPk}, + operator::OperatorKeyProvider, }; use borsh::{BorshDeserialize, BorshSerialize}; use serde::{Deserialize, Serialize}; @@ -29,6 +30,12 @@ pub struct OperatorEntry { wallet_pk: Buf32, } +impl OperatorEntry { + pub fn signing_pk(&self) -> &Buf32 { + &self.signing_pk + } +} + #[derive(Clone, Debug, Eq, PartialEq, BorshDeserialize, BorshSerialize)] pub struct OperatorTable { /// Next unassigned operator index. @@ -86,6 +93,12 @@ impl OperatorTable { } } +impl OperatorKeyProvider for OperatorTable { + fn get_operator_signing_pk(&self, idx: OperatorIdx) -> Option { + self.get_operator(idx).map(|ent| ent.signing_pk) + } +} + #[derive(Clone, Debug, Eq, PartialEq, BorshDeserialize, BorshSerialize)] pub struct DepositsTable { /// Next unassigned deposit index. diff --git a/crates/storage/src/ops/bridge_relay.rs b/crates/storage/src/ops/bridge_relay.rs new file mode 100644 index 000000000..e07b8fbfa --- /dev/null +++ b/crates/storage/src/ops/bridge_relay.rs @@ -0,0 +1,53 @@ +//! Bridge Msg operation interface + +use std::sync::Arc; + +use alpen_express_db::interfaces::bridge_relay::BridgeMessageDb; +use alpen_express_primitives::relay::types::BridgeMessage; + +use crate::exec::*; + +/// Database context for an database operation interface. +pub struct Context { + db: Arc, +} + +impl Context { + pub fn new(db: Arc) -> Self { + Self { db } + } + + pub fn into_ops(self, pool: threadpool::ThreadPool) -> BridgeMsgOps { + BridgeMsgOps::new(pool, Arc::new(self)) + } +} + +inst_ops! { + (BridgeMsgOps, Context) { + write_msg(id: u128, msg: BridgeMessage) => (); + delete_msgs_before_timestamp(msg_ids: u128) => (); + get_msgs_by_scope(scope: Vec) => Vec; + } +} + +fn write_msg( + context: &Context, + id: u128, + msg: BridgeMessage, +) -> DbResult<()> { + context.db.write_msg(id, msg) +} + +fn delete_msgs_before_timestamp( + context: &Context, + msg_ids: u128, +) -> DbResult<()> { + context.db.delete_msgs_before_timestamp(msg_ids) +} + +fn get_msgs_by_scope( + context: &Context, + scope: Vec, +) -> DbResult> { + context.db.get_msgs_by_scope(&scope) +} diff --git a/crates/storage/src/ops/mod.rs b/crates/storage/src/ops/mod.rs index 35b71a929..f1e819993 100644 --- a/crates/storage/src/ops/mod.rs +++ b/crates/storage/src/ops/mod.rs @@ -1,3 +1,4 @@ +pub mod bridge_relay; pub mod inscription; pub mod l1tx_broadcast; pub mod l2; diff --git a/sequencer/src/rpc_server.rs b/sequencer/src/rpc_server.rs index aba6a6944..9da5ec307 100644 --- a/sequencer/src/rpc_server.rs +++ b/sequencer/src/rpc_server.rs @@ -558,6 +558,16 @@ impl AlpenApiServer for AlpenRpcImpl { .transpose()?; Ok(block) } + + async fn get_msgs_by_scope(&self, _scope: HexBytes) -> RpcResult> { + warn!("alp_getBridgeMsgsByScope not implemented"); + Ok(Vec::new()) + } + + async fn submit_bridge_msg(&self, _raw_msg: HexBytes) -> RpcResult<()> { + warn!("alp_submitBridgeMsg not implemented"); + Ok(()) + } } /// Wrapper around [``tokio::task::spawn_blocking``] that handles errors in