Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #2852

Open
wants to merge 2 commits into
base: spr/main/9bf71741
Choose a base branch
from
Open

wip #2852

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/starknet_http_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
axum.workspace = true
hyper.workspace = true
infra_utils.workspace = true
metrics.workspace = true
papyrus_config.workspace = true
reqwest = { workspace = true, optional = true }
serde.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion crates/starknet_http_server/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tracing::{error, info, instrument};

use crate::config::HttpServerConfig;
use crate::errors::HttpServerRunError;
use crate::metrics::{count_added_transaction, count_transaction_status, init_metrics};

#[cfg(test)]
#[path = "http_server_test.rs"]
Expand All @@ -36,6 +37,7 @@ pub struct AppState {
impl HttpServer {
pub fn new(config: HttpServerConfig, gateway_client: SharedGatewayClient) -> Self {
let app_state = AppState { gateway_client };
init_metrics();
HttpServer { config, app_state }
}

Expand All @@ -62,12 +64,14 @@ async fn add_tx(
State(app_state): State<AppState>,
Json(tx): Json<RpcTransaction>,
) -> HttpServerResult<Json<TransactionHash>> {
let gateway_input: GatewayInput = GatewayInput { rpc_tx: tx.clone(), message_metadata: None };
count_added_transaction();
let gateway_input: GatewayInput = GatewayInput { rpc_tx: tx, message_metadata: None };

let add_tx_result = app_state.gateway_client.add_tx(gateway_input).await.map_err(|join_err| {
error!("Failed to process tx: {}", join_err);
GatewaySpecError::UnexpectedError { data: "Internal server error".to_owned() }
});
count_transaction_status(add_tx_result.is_ok());

add_tx_result_as_json(add_tx_result)
}
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_http_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod communication;
pub mod config;
pub mod errors;
pub mod http_server;
mod metrics;
#[cfg(feature = "testing")]
pub mod test_utils;
35 changes: 35 additions & 0 deletions crates/starknet_http_server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use metrics::{absolute_counter, describe_counter, register_counter};

// TODO(Tsabary): add tests for metrics.
const ADDED_TRANSACTIONS_TOTAL: (&str, &str, u64) =
("ADDED_TRANSACTIONS_TOTAL", "Total number of transactions added", 0);
const ADDED_TRANSACTIONS_SUCCESS: (&str, &str, u64) =
("ADDED_TRANSACTIONS_SUCCESS", "Number of successfully added transactions", 0);
const ADDED_TRANSACTIONS_FAILURE: (&str, &str, u64) =
("ADDED_TRANSACTIONS_FAILURE", "Number of faulty added transactions", 0);

pub(crate) fn init_metrics() {
register_counter!(ADDED_TRANSACTIONS_TOTAL.0);
describe_counter!(ADDED_TRANSACTIONS_TOTAL.0, ADDED_TRANSACTIONS_TOTAL.1);
absolute_counter!(ADDED_TRANSACTIONS_TOTAL.0, ADDED_TRANSACTIONS_TOTAL.2);

register_counter!(ADDED_TRANSACTIONS_SUCCESS.0);
describe_counter!(ADDED_TRANSACTIONS_SUCCESS.0, ADDED_TRANSACTIONS_SUCCESS.1);
absolute_counter!(ADDED_TRANSACTIONS_SUCCESS.0, ADDED_TRANSACTIONS_SUCCESS.2);

register_counter!(ADDED_TRANSACTIONS_FAILURE.0);
describe_counter!(ADDED_TRANSACTIONS_FAILURE.0, ADDED_TRANSACTIONS_FAILURE.1);
absolute_counter!(ADDED_TRANSACTIONS_FAILURE.0, ADDED_TRANSACTIONS_FAILURE.2);
}

pub(crate) fn count_added_transaction() {
metrics::increment_counter!(ADDED_TRANSACTIONS_TOTAL.0);
}

pub(crate) fn count_transaction_status(add_tx_success: bool) {
if add_tx_success {
metrics::increment_counter!(ADDED_TRANSACTIONS_SUCCESS.0);
} else {
metrics::increment_counter!(ADDED_TRANSACTIONS_FAILURE.0);
}
}
3 changes: 2 additions & 1 deletion crates/starknet_monitoring_endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repository.workspace = true
license-file.workspace = true

[features]
testing = ["tokio", "tower"]
testing = ["thiserror", "tokio", "tower"]

[lints]
workspace = true
Expand All @@ -19,6 +19,7 @@ metrics-exporter-prometheus.workspace = true
papyrus_config.workspace = true
serde.workspace = true
starknet_sequencer_infra.workspace = true
thiserror = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
tower = { workspace = true, optional = true }
tracing.workspace = true
Expand Down
39 changes: 38 additions & 1 deletion crates/starknet_monitoring_endpoint/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@ use hyper::client::HttpConnector;
use hyper::Client;
use infra_utils::run_until::run_until;
use infra_utils::tracing::{CustomLogger, TraceLevel};
use thiserror::Error;
use tracing::info;

use crate::monitoring_endpoint::{ALIVE, MONITORING_PREFIX};
use crate::monitoring_endpoint::{ALIVE, METRICS, MONITORING_PREFIX};

// TODO(Tsabary): rename IsAliveClient to MonitoringClient.

#[derive(Clone, Debug, Error, PartialEq, Eq)]
pub enum MonitoringClientError {
#[error(transparent)]
ConnectionError(#[from] hyper::Error),
}

/// Client for querying 'alive' status of an http server.
pub struct IsAliveClient {
Expand Down Expand Up @@ -46,6 +55,34 @@ impl IsAliveClient {
.ok_or(())
.map(|_| ())
}

pub async fn get_metrics(&self) -> Result<(), MonitoringClientError> {
let response = self
.client
.request(build_request(&self.socket.ip(), self.socket.port(), METRICS))
.await?;

// assert_eq!(response.status(), StatusCode::OK);
// let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
// let body_string = String::from_utf8(body_bytes.to_vec()).unwrap();
// let expected_prefix = format!(
// "# HELP {metric_name} {metric_help}\n# TYPE {metric_name} counter\n{metric_name} \
// {metric_value}\n\n"
// );
// assert!(body_string.starts_with(&expected_prefix));

// if !response.status().is_success() {
// return Err(());
// }

// let body = hyper::body::to_bytes(response.into_body())
// .await
// .map_err(|_| ())?
// .to_vec();

// info!("Metrics: {:?}", String::from_utf8(body).unwrap());
Ok(())
}
}

pub(crate) fn build_request(ip: &IpAddr, port: u16, method: &str) -> Request<Body> {
Expand Down
Loading