Skip to content

Add rpc alt proxy for request filtering and transformation #22181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ members = [
"crates/sui-indexer-alt-framework-store-traits",
"crates/sui-indexer-alt-graphql",
"crates/sui-indexer-alt-jsonrpc",
"crates/sui-indexer-alt-jsonrpc-proxy",
"crates/sui-indexer-alt-metrics",
"crates/sui-indexer-alt-reader",
"crates/sui-indexer-alt-restorer",
Expand Down
7 changes: 6 additions & 1 deletion crates/mysten-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use prometheus::{
TextEncoder,
};
use tap::TapFallible;
use tracing::{warn, Span};
use tracing::{info,warn, Span};

pub use scopeguard;
use uuid::Uuid;
Expand Down Expand Up @@ -600,6 +600,7 @@ pub const METRICS_ROUTE: &str = "/metrics";
// and endpoint that prometheus agent can use to poll for the metrics.
// A RegistryService is returned that can be used to get access in prometheus Registries.
pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
info!("Inside start_prometheus_server, starting prometheus server at {}", addr);
let registry = Registry::new();

let registry_service = RegistryService::new(registry);
Expand All @@ -611,12 +612,16 @@ pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService {
return registry_service;
}

info!("Creating metrics router at {}", METRICS_ROUTE);
let app = Router::new()
.route(METRICS_ROUTE, get(metrics))
.layer(Extension(registry_service.clone()));

info!("Spawning tokio task to bind to {}", addr);
tokio::spawn(async move {
info!("Binding to {}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
info!("Bound to {}", addr);
axum::serve(listener, app.into_make_service())
.await
.unwrap();
Expand Down
31 changes: 31 additions & 0 deletions crates/sui-indexer-alt-jsonrpc-proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "sui-indexer-alt-jsonrpc-proxy"
version = "0.1.0"
edition = "2021"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false

[dependencies]
axum.workspace = true
axum-server.workspace = true
anyhow.workspace = true
bytes.workspace = true
clap.workspace = true
dashmap.workspace = true
futures.workspace = true
hyper.workspace = true
moka.workspace = true
phf = { version = "0.11", features = ["macros"] }
url = {workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
reqwest.workspace = true
mysten-metrics.workspace = true
prometheus.workspace = true
telemetry-subscribers.workspace = true
rand = "0.8"
12 changes: 12 additions & 0 deletions crates/sui-indexer-alt-jsonrpc-proxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Sui indexer-alt-jsonrpc proxy

This proxy is adapted from `sui-edge-proxy` to filter and transform requests sent to `indexer-alt-jsonrpc` server.
Unsupported methods can be configured via `unsupported-methods` configuration and are dropped by the proxy.
Since the cursor format used by alt rpc server is different, queries using cursors are transformed to a previously cached cursor before being proxied.

## Run the proxy
`config.yaml` provides an example config file. Provide the config file and run like this
```
cargo run -p sui-indexer-alt-jsonrpc-proxy -- --config <config-file-path>
```

10 changes: 10 additions & 0 deletions crates/sui-indexer-alt-jsonrpc-proxy/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
listen-address: "127.0.0.1:8080"
metrics-address: "127.0.0.1:9184"

fullnode-address: "https://rpc-fpa.sui-mainnet.mystenlabs.com/json-rpc"

unsupported-methods:
- "sui_executeTransaction"

cursor-cache-size: 10000
114 changes: 114 additions & 0 deletions crates/sui-indexer-alt-jsonrpc-proxy/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{Context, Result};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::DurationSeconds;
use std::{net::SocketAddr, time::Duration};
use tracing::error;
use url::Url;

#[serde_as]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct ProxyConfig {
pub listen_address: SocketAddr,
pub metrics_address: SocketAddr,
/// The address of the fullnode to proxy requests to.
pub fullnode_address: Url,
/// Methods that are not proxied to the fullnode.
pub unsupported_methods: Vec<String>,
/// The size of the cache for pagination cursors.
pub cursor_cache_size: u64,
/// Maximum number of idle connections to keep in the connection pool.
/// When set, this limits the number of connections that remain open but unused,
/// helping to conserve system resources.
#[serde(default = "default_max_idle_connections")]
pub max_idle_connections: usize,
/// Idle timeout for connections in the connection pool.
/// This should be set to a value less than the keep-alive timeout of the server to avoid sending requests to a closed connection.
/// if your you expect sui-edge-proxy to recieve a small number of requests per second, you should set this to a higher value.
#[serde_as(as = "DurationSeconds")]
#[serde(default = "default_idle_timeout")]
pub idle_timeout_seconds: Duration,
// TODO: add allowed origin list
}

fn default_max_idle_connections() -> usize {
100
}

fn default_idle_timeout() -> Duration {
Duration::from_secs(60)
}

/// Load and validate configuration
pub async fn load<P: AsRef<std::path::Path>>(path: P) -> Result<(ProxyConfig, Client)> {
let path = path.as_ref();
let config: ProxyConfig = serde_yaml::from_reader(
std::fs::File::open(path).context(format!("cannot open {:?}", path))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::fs::File::open(path).context(format!("cannot open {:?}", path))?,
std::fs::File::open(path)
.with_context(|| format!("Failed to open ProxyConfig at {}", path.display()))?,

)?;

// Build a reqwest client that supports HTTP/2
let client = reqwest::ClientBuilder::new()
.http2_prior_knowledge()
.http2_keep_alive_while_idle(true)
.pool_idle_timeout(config.idle_timeout_seconds)
.pool_max_idle_per_host(config.max_idle_connections)
.build()
.expect("Failed to build HTTP/2 client");

validate_fullnode_url(&client, &config.fullnode_address).await?;

Ok((config, client))
}

/// Validate that the given PeerConfig URL has a valid host
async fn validate_fullnode_url(client: &Client, fullnode_address: &Url) -> Result<()> {
let health_url = fullnode_address
.join("/health")
.context("Failed to construct health check URL")?;

const RETRY_DELAY: Duration = Duration::from_secs(1);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);

let mut attempt = 1;
loop {
match client
.get(health_url.clone())
.timeout(REQUEST_TIMEOUT)
.send()
.await
{
Ok(response) => {
if response.version() != reqwest::Version::HTTP_2 {
tracing::warn!(
"Fullnode {:?} does not support HTTP/2 (using {:?})",
fullnode_address,
response.version()
);
}

if !response.status().is_success() {
tracing::warn!(
"Health check failed for fullnode {:?} with status {}",
fullnode_address,
response.status()
);
Comment on lines +87 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::warn!(
"Fullnode {:?} does not support HTTP/2 (using {:?})",
fullnode_address,
response.version()
);
}
if !response.status().is_success() {
tracing::warn!(
"Health check failed for fullnode {:?} with status {}",
fullnode_address,
response.status()
);
warn!(
address = ?fullnode_address,
version = ?response.version(),
"Fullnode does not support HTTP/2",
);
}
if !response.status().is_success() {
warn!(
address = ?fullnode_address,
status = %response.status(),
"Health check failed for fullnode",
);

}
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the health check fails (or the fullnode is not using HTTP/2) is the fullnode considered validated?

}
Err(e) => {
error!(
"Failed to connect to fullnode {:?} (attempt {}): {}",
fullnode_address, attempt, e
Comment on lines +105 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Failed to connect to fullnode {:?} (attempt {}): {}",
fullnode_address, attempt, e
address = ?fullnode_address,
attempt
"Failed to connect to fullnode: {e}",

);
tokio::time::sleep(RETRY_DELAY).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using sleep in a loop, it would be better to use an interval.

attempt += 1;
continue;
}
}
}
}
Loading
Loading