diff --git a/Cargo.lock b/Cargo.lock index ab9df2c232df9..9e057cd34f71a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14549,6 +14549,34 @@ dependencies = [ "url", ] +[[package]] +name = "sui-indexer-alt-jsonrpc-proxy" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum 0.8.3", + "axum-server", + "bytes", + "clap", + "dashmap", + "futures", + "hyper 1.5.2", + "moka", + "mysten-metrics", + "phf", + "prometheus", + "rand 0.8.5", + "reqwest 0.12.9", + "serde", + "serde_json", + "serde_with", + "serde_yaml 0.8.26", + "telemetry-subscribers", + "tokio", + "tracing", + "url", +] + [[package]] name = "sui-indexer-alt-metrics" version = "1.49.0" diff --git a/Cargo.toml b/Cargo.toml index 8d32d7ce1404a..4d2b54e247a8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,6 +128,7 @@ members = [ "crates/sui-indexer-alt-framework-store-traits", "crates/sui-indexer-alt-graphql", "crates/sui-indexer-alt-jsonrpc", + "crates/sui-indexer-alt-jsonrpc-proxy", "crates/sui-indexer-alt-metrics", "crates/sui-indexer-alt-reader", "crates/sui-indexer-alt-restorer", diff --git a/crates/mysten-metrics/src/lib.rs b/crates/mysten-metrics/src/lib.rs index 8cf2310fce3e0..23c716155c9b4 100644 --- a/crates/mysten-metrics/src/lib.rs +++ b/crates/mysten-metrics/src/lib.rs @@ -20,7 +20,7 @@ use prometheus::{ TextEncoder, }; use tap::TapFallible; -use tracing::{warn, Span}; +use tracing::{info,warn, Span}; pub use scopeguard; use uuid::Uuid; @@ -600,6 +600,7 @@ pub const METRICS_ROUTE: &str = "/metrics"; // and endpoint that prometheus agent can use to poll for the metrics. // A RegistryService is returned that can be used to get access in prometheus Registries. pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService { + info!("Inside start_prometheus_server, starting prometheus server at {}", addr); let registry = Registry::new(); let registry_service = RegistryService::new(registry); @@ -611,12 +612,16 @@ pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService { return registry_service; } + info!("Creating metrics router at {}", METRICS_ROUTE); let app = Router::new() .route(METRICS_ROUTE, get(metrics)) .layer(Extension(registry_service.clone())); + info!("Spawning tokio task to bind to {}", addr); tokio::spawn(async move { + info!("Binding to {}", addr); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + info!("Bound to {}", addr); axum::serve(listener, app.into_make_service()) .await .unwrap(); diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/Cargo.toml b/crates/sui-indexer-alt-jsonrpc-proxy/Cargo.toml new file mode 100644 index 0000000000000..82e33dc49b484 --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "sui-indexer-alt-jsonrpc-proxy" +version = "0.1.0" +edition = "2021" +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false + +[dependencies] +axum.workspace = true +axum-server.workspace = true +anyhow.workspace = true +bytes.workspace = true +clap.workspace = true +dashmap.workspace = true +futures.workspace = true +hyper.workspace = true +moka.workspace = true +phf = { version = "0.11", features = ["macros"] } +url = {workspace = true, features = ["serde"] } +tokio = { workspace = true, features = ["full"] } +tracing.workspace = true +serde.workspace = true +serde_with.workspace = true +serde_json.workspace = true +serde_yaml.workspace = true +reqwest.workspace = true +mysten-metrics.workspace = true +prometheus.workspace = true +telemetry-subscribers.workspace = true +rand = "0.8" diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/README.md b/crates/sui-indexer-alt-jsonrpc-proxy/README.md new file mode 100644 index 0000000000000..3348605712e8e --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/README.md @@ -0,0 +1,12 @@ +# Sui indexer-alt-jsonrpc proxy + +This proxy is adapted from `sui-edge-proxy` to filter and transform requests sent to `indexer-alt-jsonrpc` server. +Unsupported methods can be configured via `unsupported-methods` configuration and are dropped by the proxy. +Since the cursor format used by alt rpc server is different, queries using cursors are transformed to a previously cached cursor before being proxied. + +## Run the proxy +`config.yaml` provides an example config file. Provide the config file and run like this +``` +cargo run -p sui-indexer-alt-jsonrpc-proxy -- --config +``` + diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/config.yaml b/crates/sui-indexer-alt-jsonrpc-proxy/config.yaml new file mode 100644 index 0000000000000..6e2f1d8316dac --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/config.yaml @@ -0,0 +1,13 @@ +--- +listen-address: "127.0.0.1:8080" +metrics-address: "127.0.0.1:9184" + +fullnode-address: "https://rpc-fpa.sui-mainnet.mystenlabs.com/json-rpc" + +unsupported-methods: + - "sui_executeTransaction" + +allowed-origins: + - "https://apps-backend.sui.io" + +cursor-cache-size: 10000 diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/src/config.rs b/crates/sui-indexer-alt-jsonrpc-proxy/src/config.rs new file mode 100644 index 0000000000000..f9ec59c5e8ecc --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/src/config.rs @@ -0,0 +1,115 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{Context, Result}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use serde_with::DurationSeconds; +use std::{net::SocketAddr, time::Duration}; +use tracing::error; +use url::Url; + +#[serde_as] +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct ProxyConfig { + pub listen_address: SocketAddr, + pub metrics_address: SocketAddr, + /// The address of the fullnode to proxy requests to. + pub fullnode_address: Url, + /// Methods that are not proxied to the fullnode. + pub unsupported_methods: Vec, + /// The list of origins that are allowed to access the proxy. + pub allowed_origins: Option>, + /// The size of the cache for pagination cursors. + pub cursor_cache_size: u64, + /// Maximum number of idle connections to keep in the connection pool. + /// When set, this limits the number of connections that remain open but unused, + /// helping to conserve system resources. + #[serde(default = "default_max_idle_connections")] + pub max_idle_connections: usize, + /// Idle timeout for connections in the connection pool. + /// This should be set to a value less than the keep-alive timeout of the server to avoid sending requests to a closed connection. + /// if your you expect sui-edge-proxy to recieve a small number of requests per second, you should set this to a higher value. + #[serde_as(as = "DurationSeconds")] + #[serde(default = "default_idle_timeout")] + pub idle_timeout_seconds: Duration, +} + +fn default_max_idle_connections() -> usize { + 100 +} + +fn default_idle_timeout() -> Duration { + Duration::from_secs(60) +} + +/// Load and validate configuration +pub async fn load>(path: P) -> Result<(ProxyConfig, Client)> { + let path = path.as_ref(); + let config: ProxyConfig = serde_yaml::from_reader( + std::fs::File::open(path).context(format!("cannot open {:?}", path))?, + )?; + + // Build a reqwest client that supports HTTP/2 + let client = reqwest::ClientBuilder::new() + .http2_prior_knowledge() + .http2_keep_alive_while_idle(true) + .pool_idle_timeout(config.idle_timeout_seconds) + .pool_max_idle_per_host(config.max_idle_connections) + .build() + .expect("Failed to build HTTP/2 client"); + + validate_fullnode_url(&client, &config.fullnode_address).await?; + + Ok((config, client)) +} + +/// Validate that the given PeerConfig URL has a valid host +async fn validate_fullnode_url(client: &Client, fullnode_address: &Url) -> Result<()> { + let health_url = fullnode_address + .join("/health") + .context("Failed to construct health check URL")?; + + const RETRY_DELAY: Duration = Duration::from_secs(1); + const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + + let mut attempt = 1; + loop { + match client + .get(health_url.clone()) + .timeout(REQUEST_TIMEOUT) + .send() + .await + { + Ok(response) => { + if response.version() != reqwest::Version::HTTP_2 { + tracing::warn!( + "Fullnode {:?} does not support HTTP/2 (using {:?})", + fullnode_address, + response.version() + ); + } + + if !response.status().is_success() { + tracing::warn!( + "Health check failed for fullnode {:?} with status {}", + fullnode_address, + response.status() + ); + } + return Ok(()); + } + Err(e) => { + error!( + "Failed to connect to fullnode {:?} (attempt {}): {}", + fullnode_address, attempt, e + ); + tokio::time::sleep(RETRY_DELAY).await; + attempt += 1; + continue; + } + } + } +} diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/src/cursor.rs b/crates/sui-indexer-alt-jsonrpc-proxy/src/cursor.rs new file mode 100644 index 0000000000000..ad9f3bce51309 --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/src/cursor.rs @@ -0,0 +1,195 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// Adapted from sui-rpc-benchmark/src/json_rpc/runner.rs + +use anyhow::{Context as _, Result}; +use moka::policy::EvictionPolicy; +use moka::sync::Cache; +use phf::phf_map; +use serde::Deserialize; +use serde_json::Value; +use tracing::{debug, error}; + +/// static map of method names to the index of their cursor parameter +static METHOD_CURSOR_POSITIONS: phf::Map<&'static str, usize> = phf_map! { + // based on function headers in crates/sui-json-rpc-api/src/indexer.rs + "suix_getOwnedObjects" => 2, + "suix_queryTransactionBlocks" => 1, + // based on function headers in crates/sui-json-rpc-api/src/coin.rs + "suix_getCoins" => 2, + "suix_getAllCoins" => 1, +}; + +static METHOD_LENGTHS: phf::Map<&'static str, usize> = phf_map! { + // based on function headers in crates/sui-json-rpc-api/src/indexer.rs + "suix_getOwnedObjects" => 4, + "suix_queryTransactionBlocks" => 4, + // based on function headers in crates/sui-json-rpc-api/src/coin.rs + "suix_getCoins" => 4, + "suix_getAllCoins" => 3, +}; + +/// Tracks pagination state for active pagination requests +/// The key is a tuple of method name and the params `Vec`, where the cursor parameter is set to `null`. +/// The value is the cursor for the next page. +#[derive(Clone, Debug)] +pub struct PaginationCursorState { + // TODO: potential optimization to condense the key so we can store more in the cache. + requests: Cache<(String, Vec), Value>, +} + +impl PaginationCursorState { + pub fn new(cursor_cache_size: u64) -> Self { + Self { + requests: Cache::builder() + .max_capacity(cursor_cache_size) + .eviction_policy(EvictionPolicy::lru()) + .build(), + } + } + + /// Returns the index of the cursor parameter for a method, if it exists; + /// Otherwise, it means no cursor transformation is needed for this method. + pub fn get_method_cursor_index(method: &str) -> Option { + METHOD_CURSOR_POSITIONS.get(method).copied() + } + + /// Given a method and its paramters, returns the key to be used to store the cursor in the cache. + /// The cursor parameter will be set to `null` in the key. + fn get_method_key( + method: &str, + params: &[Value], + ) -> Result<(String, Vec), anyhow::Error> { + let cursor_idx = METHOD_CURSOR_POSITIONS + .get(method) + .with_context(|| format!("method {} not found in cursor positions", method))?; + let mut key_params = params.to_vec(); + if let Some(param_to_modify) = key_params.get_mut(*cursor_idx) { + *param_to_modify = Value::Null; + } else { + let method_length = METHOD_LENGTHS + .get(method) + .with_context(|| format!("method {} not found in method lengths", method))?; + key_params.resize(*method_length, Value::Null); + } + Ok((method.to_string(), key_params)) + } + + /// In place updates the cursor parameter in the `params` array of a request to the `new_cursor`. + fn update_params_cursor( + params: &mut Value, + cursor_idx: usize, + new_cursor: Option<&Value>, + method: &str, + ) -> Result<(), anyhow::Error> { + let params_array = params + .get_mut("params") + .and_then(|v| v.as_array_mut()) + .with_context(|| format!("params not found or not an array for method {}", method))?; + // If the cursor parameter is not present, extend the array to include it. + if params_array.len() <= cursor_idx { + let method_length = METHOD_LENGTHS + .get(method) + .with_context(|| format!("method {} not found in method lengths", method))?; + params_array.resize(*method_length, Value::Null); + } + let param_to_modify = params_array.get_mut(cursor_idx).with_context(|| { + format!( + "Failed to access cursor parameter at index {} for method {}", + cursor_idx, method + ) + })?; + *param_to_modify = match new_cursor { + Some(cursor) => cursor.clone(), + None => Value::Null, + }; + Ok(()) + } + + /// Updates the stored cursor for a given method and parameters. + fn update(&self, key: (String, Vec), cursor: Option) { + if let Some(cursor) = cursor { + self.requests.insert(key, cursor); + } else { + self.requests.remove(&key); + } + } + + /// Returns a stored cursor for a given method and parameters. + /// The cursor value is originally read from the response of a successful previous request. + fn get(&self, key: &(String, Vec)) -> Option { + self.requests.get(key) + } +} + +/// Transforms the `json_body` of a request to update the cursor parameter to the cached value. +/// Returns true if the cursor was updated, false otherwise. +pub fn transform_json_body( + json_body: &mut Value, + method: &str, + params: &[Value], + pagination_state: &PaginationCursorState, +) -> Result { + if let Some(cursor_idx) = PaginationCursorState::get_method_cursor_index(method) { + if !params.is_empty() { + let method_key = PaginationCursorState::get_method_key(method, params)?; + PaginationCursorState::update_params_cursor( + json_body, + cursor_idx, + pagination_state.get(&method_key).as_ref(), + method, + )?; + return Ok(true); + } + } + Ok(false) +} + +/// Updates the pagination cursor cache based on the response of a successful request. +pub fn update_pagination_cursor_state( + resp: &bytes::Bytes, + method: &str, + params: &[Value], + pagination_state: &PaginationCursorState, +) -> Result<(), anyhow::Error> { + if PaginationCursorState::get_method_cursor_index(method).is_some() { + #[derive(Deserialize)] + struct Body { + result: Result, + } + #[derive(Debug, Deserialize)] + #[serde(rename_all = "camelCase")] + struct Result { + has_next_page: bool, + next_cursor: Option, + } + + let parse_result = serde_json::from_slice::(&resp); + + if let Ok(Body { result }) = parse_result { + let method_key = match PaginationCursorState::get_method_key(&method, ¶ms) { + Ok(key) => key, + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to get method key for method {}: {}", + method, + e + )) + } + }; + if result.has_next_page { + pagination_state.update(method_key, result.next_cursor.clone()); + } else { + pagination_state.update(method_key, None); + } + debug!( + "Updated pagination state for method: {method} with params: {params:?} to {next_cursor:?}", + next_cursor = result.next_cursor + ); + } else { + error!("Failed to parse response: {:?}", parse_result.err()); + } + } + Ok(()) +} diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/src/handlers.rs b/crates/sui-indexer-alt-jsonrpc-proxy/src/handlers.rs new file mode 100644 index 0000000000000..e4a0ab6d1ed7e --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/src/handlers.rs @@ -0,0 +1,296 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::cursor::{transform_json_body, update_pagination_cursor_state, PaginationCursorState}; +use crate::metrics::AppMetrics; +use axum::http::HeaderValue; +use axum::{ + body::Body, + extract::{Request, State}, + http::request::Parts, + http::StatusCode, + response::Response, +}; +use bytes::Bytes; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Instant; +use tracing::{debug, info, warn}; +use url::Url; + +#[derive(Clone)] +pub struct AppState { + client: reqwest::Client, + fullnode_address: Url, + unsupported_methods: HashSet, + allowed_origins: Option>, + cursor_state: Arc, + metrics: AppMetrics, +} + +impl AppState { + pub fn new( + client: reqwest::Client, + fullnode_address: Url, + unsupported_methods: HashSet, + allowed_origins: Option>, + cursor_state: Arc, + metrics: AppMetrics, + ) -> Self { + info!( + "Creating app state with allowed origins: {:?} and unsupported methods: {:?}", + allowed_origins, unsupported_methods + ); + Self { + client, + fullnode_address, + unsupported_methods, + allowed_origins, + cursor_state, + metrics, + } + } +} + +pub async fn proxy_handler( + State(state): State, + request: Request, +) -> Result { + let (mut parts, body) = request.into_parts(); + + let mut body_bytes = match axum::body::to_bytes(body, 10 * 1024 * 1024).await { + Ok(bytes) => bytes, + Err(e) => { + warn!("Failed to read request body: {}", e); + return Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("Failed to read request body")) + .unwrap()); + } + }; + + debug!("Request method: {:?}, headers: {:?}", parts.method, parts.headers); + + if let Some(allowed_origins) = &state.allowed_origins { + match parts.headers.get("origin") { + Some(origin) => { + if !allowed_origins.contains(origin.to_str().unwrap()) { + debug!("Dropping request from origin: {}", origin.to_str().unwrap()); + return Ok(Response::builder() + .status(StatusCode::FORBIDDEN) + .body(Body::from("Forbidden")) + .unwrap()); + } + } + None => { + debug!("Dropping request with no origin header"); + return Ok(Response::builder() + .status(StatusCode::FORBIDDEN) + .body(Body::from("Forbidden")) + .unwrap()); + } + } + } + + match parts + .headers + .get("Client-Request-Method") + .and_then(|h| h.to_str().ok()) + { + Some(method) if state.unsupported_methods.contains(method) => { + debug!("Dropping {method} request"); + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(format!("Dropped {method} request"))) + .unwrap()) + } + _ => { + let mut json_body = match serde_json::from_slice::(&body_bytes) { + Ok(json_body) => json_body, + Err(_) => { + debug!("Failed to parse request body as JSON"); + return Ok(proxy_request(&state, parts, body_bytes).await?.0); + } + }; + let method = json_body + .get("method") + .and_then(|m| m.as_str()) + .map(|s| s.to_string()); + let params = json_body + .get("params") + .and_then(|p| p.as_array()) + .map(|a| a.to_vec()) + .unwrap_or_default(); + match method { + Some(method) if state.unsupported_methods.contains(&method) => { + info!("Dropping {method} request with params: {params:?}"); + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(format!("Dropped {method} request"))) + .unwrap()) + } + Some(method) => { + debug!("Transforming {method} request with params: {params:?}"); + match transform_json_body(&mut json_body, &method, ¶ms, &state.cursor_state) + { + Ok(true) => { + debug!("Transformed json_body: {json_body:?}"); + body_bytes = match serde_json::to_vec(&json_body) { + Ok(bytes) => Bytes::from(bytes), + Err(_) => { + return Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("Failed to serialize transformed JSON")) + .unwrap()); + } + }; + // Now that the content has changed, update content length header if it exists + if parts.headers.get("Content-Length").is_some() { + parts.headers.insert( + "Content-Length", + HeaderValue::from_str(&body_bytes.len().to_string()).unwrap(), + ); + } + } + Ok(false) => { + // Do nothing, no cursor transformation done so no need to update body bytes or content length header. + } + Err(_) => { + debug!("Failed to transform json_body: {json_body:?}"); + return Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("Failed to transform body json")) + .unwrap()); + } + } + let (response, response_bytes) = + proxy_request(&state, parts, body_bytes).await?; + if response.status().is_success() { + let res = update_pagination_cursor_state( + &response_bytes, + &method, + ¶ms, + &state.cursor_state, + ); + if res.is_err() { + warn!( + "Failed to update pagination cursor state: {}", + res.err().unwrap() + ); + } + } + Ok(response) + } + _ => { + // We can't find out what the method is so we directly proxy the request. + Ok(proxy_request(&state, parts, body_bytes).await?.0) + } + } + } + } +} + +async fn proxy_request( + state: &AppState, + parts: Parts, + body_bytes: Bytes, +) -> Result<(Response, bytes::Bytes), (StatusCode, String)> { + info!( + "Proxying request: method={:?}, uri={:?}, headers={:?}, body_len={}", + parts.method, + parts.uri, + parts.headers, + body_bytes.len(), + ); + + let metrics = &state.metrics; + let method_str = parts.method.as_str(); + + let timer_histogram = metrics.request_latency.with_label_values(&[method_str]); + let _timer = timer_histogram.start_timer(); + + metrics + .request_size_bytes + .with_label_values(&[method_str]) + .observe(body_bytes.len() as f64); + + let mut target_url = state.fullnode_address.clone(); + + if let Some(query) = parts.uri.query() { + target_url.set_query(Some(query)); + } + + // remove host header to avoid interfering with reqwest auto-host header + let mut headers = parts.headers.clone(); + headers.remove("host"); + let request_builder = state + .client + .request(parts.method.clone(), target_url) + .headers(headers) + .body(body_bytes.clone()); + info!("Request builder: {:?}", request_builder); + + let upstream_start = Instant::now(); + let response = match request_builder.send().await { + Ok(response) => { + let status = response.status().as_u16().to_string(); + metrics + .upstream_response_latency + .with_label_values(&[method_str, &status]) + .observe(upstream_start.elapsed().as_secs_f64()); + metrics + .requests_total + .with_label_values(&[method_str, &status]) + .inc(); + debug!("Response: {:?}", response); + response + } + Err(e) => { + warn!("Failed to send request: {}", e); + metrics + .upstream_response_latency + .with_label_values(&[method_str, "error"]) + .observe(upstream_start.elapsed().as_secs_f64()); + metrics + .requests_total + .with_label_values(&[method_str, "error"]) + .inc(); + if e.is_timeout() { + metrics + .timeouts_total + .with_label_values(&[method_str]) + .inc(); + } + return Err((StatusCode::BAD_GATEWAY, format!("Request failed: {}", e))); + } + }; + + let response_headers = response.headers().clone(); + let response_bytes = match response.bytes().await { + Ok(bytes) => bytes, + Err(e) => { + warn!("Failed to read response body: {}", e); + metrics + .error_counts + .with_label_values(&[method_str, "response_body_read"]) + .inc(); + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to read response body".to_string(), + )); + } + }; + metrics + .response_size_bytes + .with_label_values(&[method_str]) + .observe(response_bytes.len() as f64); + + let mut resp = Response::new(response_bytes.clone().into()); + for (name, value) in response_headers { + if let Some(name) = name { + resp.headers_mut().insert(name, value); + } + } + + Ok((resp, response_bytes)) +} diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/src/lib.rs b/crates/sui-indexer-alt-jsonrpc-proxy/src/lib.rs new file mode 100644 index 0000000000000..38e3f6e5d80b0 --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/src/lib.rs @@ -0,0 +1,7 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +pub mod config; +pub mod cursor; +pub mod handlers; +pub mod metrics; diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/src/main.rs b/crates/sui-indexer-alt-jsonrpc-proxy/src/main.rs new file mode 100644 index 0000000000000..69ef5374cf7c8 --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/src/main.rs @@ -0,0 +1,76 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use axum::{routing::any, Router}; +use clap::Parser; +use mysten_metrics::start_prometheus_server; +use reqwest::Client; +use std::sync::Arc; +use sui_indexer_alt_jsonrpc_proxy::config::{load, ProxyConfig}; +use sui_indexer_alt_jsonrpc_proxy::cursor::PaginationCursorState; +use sui_indexer_alt_jsonrpc_proxy::handlers::{proxy_handler, AppState}; +use sui_indexer_alt_jsonrpc_proxy::metrics::AppMetrics; +use tracing::info; + +#[derive(Parser, Debug)] +#[clap(rename_all = "kebab-case")] +struct Args { + #[clap( + long, + short, + default_value = "./config.yaml", + help = "Specify the config file path to use" + )] + config: String, +} + +#[tokio::main] +async fn main() { + info!("Starting sui-indexer-alt-jsonrpc-proxy"); + let args = Args::parse(); + + let (_guard, _filter_handle) = telemetry_subscribers::TelemetryConfig::new() + .with_env() + .init(); + info!("Initialized telemetry"); + + let (config, client): (ProxyConfig, Client) = + load(&args.config).await.expect("Failed to load config"); + info!("Loaded config: {:?}", config); + + let registry_service = start_prometheus_server(config.metrics_address); + info!("Started prometheus server at {}", config.metrics_address); + + let prometheus_registry = registry_service.default_registry(); + mysten_metrics::init_metrics(&prometheus_registry); + info!("Initialized metrics"); + + let app_metrics = AppMetrics::new(&prometheus_registry); + info!("Created app metrics"); + + // Create a single shared cursor state + let cursor_state = Arc::new(PaginationCursorState::new(config.cursor_cache_size)); + info!("Created cursor state"); + + let app_state = AppState::new( + client, + config.fullnode_address, + config.unsupported_methods.into_iter().collect(), + config + .allowed_origins + .map(|origins| origins.into_iter().collect()), + cursor_state, + app_metrics, + ); + info!("Created app state"); + + let app = Router::new() + .fallback(any(proxy_handler)) + .with_state(app_state); + + info!("Starting server on {}", config.listen_address); + axum_server::Server::bind(config.listen_address) + .serve(app.into_make_service()) + .await + .unwrap(); +} diff --git a/crates/sui-indexer-alt-jsonrpc-proxy/src/metrics.rs b/crates/sui-indexer-alt-jsonrpc-proxy/src/metrics.rs new file mode 100644 index 0000000000000..133e3a5fef2c1 --- /dev/null +++ b/crates/sui-indexer-alt-jsonrpc-proxy/src/metrics.rs @@ -0,0 +1,82 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use prometheus::{ + register_gauge_vec_with_registry, register_histogram_vec_with_registry, + register_int_counter_vec_with_registry, GaugeVec, HistogramVec, IntCounterVec, Registry, +}; + +#[derive(Clone)] +pub struct AppMetrics { + pub backend_up: GaugeVec, + pub requests_total: IntCounterVec, + pub request_latency: HistogramVec, + pub upstream_response_latency: HistogramVec, + pub response_size_bytes: HistogramVec, + pub request_size_bytes: HistogramVec, + pub timeouts_total: IntCounterVec, + pub error_counts: IntCounterVec, +} + +impl AppMetrics { + pub fn new(registry: &Registry) -> Self { + Self { + backend_up: register_gauge_vec_with_registry!( + "proxy_backend_up", + "Indicates if the backend is up (1) or down (0)", + &["method"], + registry + ) + .unwrap(), + requests_total: register_int_counter_vec_with_registry!( + "proxy_requests_total", + "Total number of requests processed by the edge proxy", + &["method", "status"], + registry + ) + .unwrap(), + request_latency: register_histogram_vec_with_registry!( + "proxy_request_latency", + "Request latency in seconds", + &["method"], + registry + ) + .unwrap(), + upstream_response_latency: register_histogram_vec_with_registry!( + "proxy_upstream_response_latency", + "Upstream response latency in seconds", + &["method", "status"], + registry + ) + .unwrap(), + response_size_bytes: register_histogram_vec_with_registry!( + "proxy_response_size_bytes", + "Size of responses in bytes", + &["method"], + registry + ) + .unwrap(), + request_size_bytes: register_histogram_vec_with_registry!( + "proxy_request_size_bytes", + "Size of incoming requests in bytes", + &["method"], + registry + ) + .unwrap(), + timeouts_total: register_int_counter_vec_with_registry!( + "proxy_timeouts_total", + "Total number of timed-out requests", + &["method"], + registry + ) + .unwrap(), + error_counts: register_int_counter_vec_with_registry!( + "proxy_error_counts", + "Total number of errors encountered by the edge proxy", + &["method", "error_type"], + registry + ) + .unwrap(), + } + } +} diff --git a/docker/sui-indexer-alt-jsonrpc-proxy/Dockerfile b/docker/sui-indexer-alt-jsonrpc-proxy/Dockerfile new file mode 100644 index 0000000000000..db48c6687dd01 --- /dev/null +++ b/docker/sui-indexer-alt-jsonrpc-proxy/Dockerfile @@ -0,0 +1,36 @@ +# Build application +# +# Copy in all crates, Cargo.toml and Cargo.lock unmodified, +# and build the application. +FROM rust:1.85-bullseye AS builder +ARG PROFILE=release +ARG GIT_REVISION +ENV GIT_REVISION=$GIT_REVISION +WORKDIR "$WORKDIR/sui" + +# sui-indexer need ca-certificates +RUN apt update && apt install -y ca-certificates postgresql + +RUN apt-get update && apt-get install -y cmake clang + +COPY Cargo.toml Cargo.lock ./ +COPY consensus consensus +COPY crates crates +COPY sui-execution sui-execution +COPY external-crates external-crates + +RUN cargo build --profile ${PROFILE} --bin sui-indexer-alt-jsonrpc-proxy + +# Production Image +FROM debian:bullseye-slim AS runtime +# Use jemalloc as memory allocator +RUN apt-get update && apt-get install -y libjemalloc-dev ca-certificates curl +ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libjemalloc.so +WORKDIR "$WORKDIR/sui" +COPY --from=builder /sui/target/release/sui-indexer-alt-jsonrpc-proxy /usr/local/bin +RUN apt update && apt install -y ca-certificates postgresql + +ARG BUILD_DATE +ARG GIT_REVISION +LABEL build-date=$BUILD_DATE +LABEL git-revision=$GIT_REVISION diff --git a/docker/sui-indexer-alt-jsonrpc-proxy/build.sh b/docker/sui-indexer-alt-jsonrpc-proxy/build.sh new file mode 100755 index 0000000000000..6bbb534aff1c7 --- /dev/null +++ b/docker/sui-indexer-alt-jsonrpc-proxy/build.sh @@ -0,0 +1,25 @@ +#!/bin/sh +# Copyright (c) Mysten Labs, Inc. +# SPDX-License-Identifier: Apache-2.0 + +# fast fail. +set -e + +DIR="$( cd "$( dirname "$0" )" && pwd )" +REPO_ROOT="$(git rev-parse --show-toplevel)" +DOCKERFILE="$DIR/Dockerfile" +GIT_REVISION="$(git describe --always --abbrev=12 --dirty --exclude '*')" +BUILD_DATE="$(date -u +'%Y-%m-%d')" + +echo +echo "Building sui-indexer-alt-jsonrpc-proxy docker image" +echo "Dockerfile: \t$DOCKERFILE" +echo "docker context: $REPO_ROOT" +echo "build date: \t$BUILD_DATE" +echo "git revision: \t$GIT_REVISION" +echo + +docker build -f "$DOCKERFILE" "$REPO_ROOT" \ + --build-arg GIT_REVISION="$GIT_REVISION" \ + --build-arg BUILD_DATE="$BUILD_DATE" \ + "$@" \ No newline at end of file