Skip to content

Commit

Permalink
refactor(relayer): Clarify logs, metrics and improve stability (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
mertwole authored Jul 17, 2024
1 parent 8ee64ef commit 9ee28a7
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 84 deletions.
43 changes: 34 additions & 9 deletions ethereum/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ pub enum TxStatus {
}

#[derive(Clone)]
pub struct EthApi(Contracts<ProviderType, Http<Client>, Ethereum>);
pub struct EthApi {
contracts: Contracts<ProviderType, Http<Client>, Ethereum>,
public_key: Address,
}

impl EthApi {
pub fn new(
Expand All @@ -84,6 +87,8 @@ impl EthApi {
None => PrivateKeySigner::random(),
};

let public_key = signer.address();

let wallet = alloy::network::EthereumWallet::from(signer);

let message_queue_address: Address = message_queue_address
Expand All @@ -104,7 +109,14 @@ impl EthApi {
relayer_address.into_array(),
)?;

Ok(EthApi(contracts))
Ok(EthApi {
contracts,
public_key,
})
}

pub async fn get_approx_balance(&self) -> Result<f64, Error> {
self.contracts.get_approx_balance(self.public_key).await
}

pub async fn provide_merkle_root(
Expand All @@ -113,7 +125,7 @@ impl EthApi {
merkle_root: [u8; 32],
proof: Vec<u8>,
) -> Result<TxHash, Error> {
self.0
self.contracts
.provide_merkle_root(
U256::from(block_number),
B256::from(merkle_root),
Expand All @@ -123,23 +135,25 @@ impl EthApi {
}

pub async fn get_tx_status(&self, tx_hash: TxHash) -> Result<TxStatus, Error> {
self.0.get_tx_status(tx_hash).await
self.contracts.get_tx_status(tx_hash).await
}

pub async fn read_finalized_merkle_root(&self, block: u32) -> Result<Option<[u8; 32]>, Error> {
self.0.read_finalized_merkle_root(U256::from(block)).await
self.contracts
.read_finalized_merkle_root(U256::from(block))
.await
}

pub async fn fetch_merkle_roots_in_range(
&self,
from: u64,
to: u64,
) -> Result<Vec<MerkleRootEntry>, Error> {
self.0.fetch_merkle_roots_in_range(from, to).await
self.contracts.fetch_merkle_roots_in_range(from, to).await
}

pub async fn block_number(&self) -> Result<u64, Error> {
self.0.block_number().await
self.contracts.block_number().await
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -154,7 +168,7 @@ impl EthApi {
payload: Vec<u8>,
proof: Vec<[u8; 32]>,
) -> Result<TxHash, Error> {
self.0
self.contracts
.provide_content_message(
U256::from(block_number),
U256::from(total_leaves),
Expand All @@ -169,7 +183,7 @@ impl EthApi {
}

pub async fn is_message_processed(&self, nonce: [u8; 32]) -> Result<bool, Error> {
self.0.is_message_processed(B256::from(nonce)).await
self.contracts.is_message_processed(B256::from(nonce)).await
}
}

Expand Down Expand Up @@ -197,6 +211,17 @@ where
})
}

pub async fn get_approx_balance(&self, address: Address) -> Result<f64, Error> {
let balance = self.provider.get_balance(address).latest().await;

if let Ok(balance) = balance {
let balance: f64 = balance.into();
Ok(balance / 1_000_000_000_000_000_000.0)
} else {
Err(Error::ErrorInHTTPTransport)
}
}

pub async fn provide_merkle_root(
&self,
block_number: U256,
Expand Down
8 changes: 4 additions & 4 deletions ethereum/src/libraries/Environment.sol
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
pragma solidity ^0.8.24;

address constant TREASURY_ADDRESS = address(
0xa513E6E4b8f2a923D98304ec87F64353C4D5C853
0x0165878A594ca255338adfa4d48449f69242Eb8F
);
address constant MESSAGE_QUEUE_ADDRESS = address(
0x0165878A594ca255338adfa4d48449f69242Eb8F
0x5FC8d32690cc91D4c39d9d3abcBD16989F875707
);
address constant RELAYER_ADDRESS = address(
0x5FC8d32690cc91D4c39d9d3abcBD16989F875707
0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9
);
address constant VERIFIER_ADDRESS = address(
0xe7f1725E7734CE288F8367e1Bb143E90bb3F0512
0x5FbDB2315678afecb367f032d93F642f64180aa3
);

// ALICE
Expand Down
5 changes: 3 additions & 2 deletions gear-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ impl GearApi {
.await?;
let mut current_set_block = self.block_hash_to_number(current_set_block).await?;

let mut prev_set_block = current_set_block - APPROX_SESSION_DURATION_IN_BLOCKS;
let mut prev_set_block =
current_set_block.saturating_sub(APPROX_SESSION_DURATION_IN_BLOCKS);

loop {
let prev_set_block_hash = self.block_number_to_hash(prev_set_block).await?;
Expand All @@ -120,7 +121,7 @@ impl GearApi {
break;
}

prev_set_block -= APPROX_SESSION_DURATION_IN_BLOCKS;
prev_set_block = prev_set_block.saturating_sub(APPROX_SESSION_DURATION_IN_BLOCKS);
}

loop {
Expand Down
22 changes: 13 additions & 9 deletions relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,25 +146,29 @@ async fn main() {
let gear_api = create_gear_client(&args.vara_endpoint).await;
let eth_api = create_eth_client(&args.ethereum_args);

let mut metrics = MetricsBuilder::new();

let proof_storage: Box<dyn ProofStorage> =
if let Some(fee_payer) = args.proof_storage_args.gear_fee_payer {
Box::from(
GearProofStorage::new(
&args.vara_endpoint.vara_endpoint,
&fee_payer,
"./onchain_proof_storage_data".into(),
)
.await
.expect("Failed to initilize proof storage"),
let proof_storage = GearProofStorage::new(
&args.vara_endpoint.vara_endpoint,
&fee_payer,
"./onchain_proof_storage_data".into(),
)
.await
.expect("Failed to initilize proof storage");

metrics = metrics.register_service(&proof_storage);

Box::from(proof_storage)
} else {
log::warn!("Fee payer not present, falling back to FileSystemProofStorage");
Box::from(FileSystemProofStorage::new("./proof_storage".into()))
};

let relayer = MerkleRootRelayer::new(gear_api, eth_api, proof_storage).await;

MetricsBuilder::new()
metrics
.register_service(&relayer)
.build()
.run(args.prometheus_args.endpoint)
Expand Down
63 changes: 60 additions & 3 deletions relayer/src/message_relayer/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use keccak_hash::keccak_256;
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
sync::mpsc::Receiver,
Expand All @@ -6,11 +7,11 @@ use std::{
use ethereum_client::{EthApi, TxHash, TxStatus};
use gear_rpc_client::{dto::Message, GearApi};
use primitive_types::H256;
use prometheus::{IntCounter, IntGauge};
use prometheus::{Gauge, IntCounter, IntGauge};

use crate::metrics::{impl_metered_service, MeteredService};

use super::{submit_message, AuthoritySetId, BlockEvent, BlockNumber, RelayedMerkleRoot};
use super::{AuthoritySetId, BlockEvent, BlockNumber, RelayedMerkleRoot};

pub struct MessageProcessor {
eth_api: EthApi,
Expand Down Expand Up @@ -67,7 +68,8 @@ struct RelayMessagePendingTx {

impl_metered_service! {
struct Metrics {
pending_tx_count: IntGauge
pending_tx_count: IntGauge,
fee_payer_balance: Gauge
}
}

Expand All @@ -91,6 +93,10 @@ impl Metrics {
"message_relayer_message_processor_pending_tx_count",
"Amount of txs pending finalization on ethereum",
)?,
fee_payer_balance: Gauge::new(
"message_relayer_message_processor_fee_payer_balance",
"Transaction fee payer balance",
)?,
})
}
}
Expand Down Expand Up @@ -128,6 +134,9 @@ impl MessageProcessor {
let mut paid_messages = HashSet::new();

loop {
let fee_payer_balance = self.eth_api.get_approx_balance().await?;
self.metrics.fee_payer_balance.set(fee_payer_balance);

for event in block_events.try_iter() {
match event {
BlockEvent::MessageSent { message } => {
Expand Down Expand Up @@ -364,3 +373,51 @@ impl Era {
}
}
}

async fn submit_message(
gear_api: &GearApi,
eth_api: &EthApi,
message: &Message,
merkle_root_block: u32,
merkle_root_block_hash: H256,
) -> anyhow::Result<TxHash> {
let message_hash = message_hash(message);

log::info!("Relaying message with hash {}", hex::encode(message_hash));

let proof = gear_api
.fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into())
.await?;

let tx_hash = eth_api
.provide_content_message(
merkle_root_block,
proof.num_leaves as u32,
proof.leaf_index as u32,
message.nonce_le,
message.source,
message.destination,
message.payload.to_vec(),
proof.proof,
)
.await?;

log::info!("Message #{:?} relaying started", message.nonce_le);

Ok(tx_hash)
}

fn message_hash(message: &Message) -> [u8; 32] {
let data = [
message.nonce_le.as_ref(),
message.source.as_ref(),
message.destination.as_ref(),
message.payload.as_ref(),
]
.concat();

let mut hash = [0; 32];
keccak_256(&data, &mut hash);

hash
}
54 changes: 1 addition & 53 deletions relayer/src/message_relayer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use ethereum_client::{EthApi, TxHash};
use ethereum_client::EthApi;
use gear_rpc_client::{dto::Message, GearApi};
use keccak_hash::keccak_256;
use primitive_types::{H256, U256};

use crate::metrics::MeteredService;
Expand Down Expand Up @@ -96,54 +95,3 @@ impl MessageRelayer {
Ok(())
}
}

async fn submit_message(
gear_api: &GearApi,
eth_api: &EthApi,
message: &Message,
merkle_root_block: u32,
merkle_root_block_hash: H256,
) -> anyhow::Result<TxHash> {
let message_hash = message_hash(message);

log::info!("Relaying message with hash {}", hex::encode(message_hash));

let proof = gear_api
.fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into())
.await?;

// TODO: Fully decode
let nonce = H256::from(message.nonce_le);

let tx_hash = eth_api
.provide_content_message(
merkle_root_block,
proof.num_leaves as u32,
proof.leaf_index as u32,
nonce.to_fixed_bytes(),
message.source,
message.destination,
message.payload.to_vec(),
proof.proof,
)
.await?;

log::info!("Message #{} relaying started", nonce);

Ok(tx_hash)
}

fn message_hash(message: &Message) -> [u8; 32] {
let data = [
message.nonce_le.as_ref(),
message.source.as_ref(),
message.destination.as_ref(),
message.payload.as_ref(),
]
.concat();

let mut hash = [0; 32];
keccak_256(&data, &mut hash);

hash
}
Loading

0 comments on commit 9ee28a7

Please sign in to comment.