diff --git a/Cargo.lock b/Cargo.lock index 1f034460dd..0b6e2a7672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,7 +110,7 @@ dependencies = [ "build", "bytes", "chrono", - "clap 4.5.17", + "clap 4.5.21", "colored_json", "dekaf", "derivative", @@ -865,7 +865,7 @@ dependencies = [ "anyhow", "async-stripe", "chrono", - "clap 4.5.17", + "clap 4.5.21", "futures", "itertools 0.10.5", "serde", @@ -1271,9 +1271,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.17" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -1281,9 +1281,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.17" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -1293,9 +1293,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.13" +version = "4.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" +checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -1406,7 +1406,7 @@ dependencies = [ "async-process", "async-trait", "bytes", - "clap 4.5.17", + "clap 4.5.21", "futures", "insta", "ops", @@ -1812,7 +1812,7 @@ dependencies = [ "async-process", "automations", "chrono", - "clap 4.5.17", + "clap 4.5.21", "futures", "humantime", "humantime-serde", @@ -1872,7 +1872,7 @@ dependencies = [ "base64 0.13.1", "bumpalo", "bytes", - "clap 4.5.17", + "clap 4.5.21", "crypto-common", "deadpool", "doc", @@ -2341,7 +2341,7 @@ version = "0.0.0" dependencies = [ "anyhow", "atty", - "clap 4.5.17", + "clap 4.5.21", "tracing", "tracing-subscriber", ] @@ -2358,7 +2358,7 @@ dependencies = [ "build", "bytelines", "bytes", - "clap 4.5.17", + "clap 4.5.21", "comfy-table", "connector-init", "coroutines", @@ -3558,7 +3558,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -4297,7 +4297,7 @@ dependencies = [ "caseless", "chardetng", "chrono", - "clap 4.5.17", + "clap 4.5.21", "criterion", "csv", "doc", @@ -4708,7 +4708,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.13.0", "log", "multimap", @@ -5331,7 +5331,7 @@ dependencies = [ "assemble", "async-process", "bytes", - "clap 4.5.17", + "clap 4.5.21", "connector-init", "coroutines", "derive-sqlite", @@ -5582,7 +5582,7 @@ version = "0.0.0" dependencies = [ "anyhow", "bytes", - "clap 4.5.17", + "clap 4.5.21", "doc", "flow_cli_common", "indexmap 1.9.3", diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 38fb4bdce5..c4fac32f2a 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -25,14 +25,11 @@ pub use api_client::KafkaApiClient; use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser}; use connector::{DekafConfig, DeletionMode}; use flow_client::client::{ - fetch_control_plane_authorization, fetch_task_authorization, refresh_authorizations, - RefreshToken, + fetch_control_plane_authorization, refresh_authorizations, RefreshToken, }; -use models::{authorizations::ControlClaims, Materialization}; use percent_encoding::{percent_decode_str, utf8_percent_encode}; -use proto_flow::flow::MaterializationSpec; use serde::{Deserialize, Serialize}; -use std::time::{Instant, SystemTime}; +use std::time::SystemTime; pub struct App { /// Hostname which is advertised for Kafka access. @@ -70,12 +67,12 @@ pub struct UserAuth { pub struct TaskAuth { client: flow_client::Client, - name: String, + task_name: String, config: DekafConfig, bindings: Vec, - // Needed to refresh auth token - claims: ControlClaims, + // When access token expires + exp: time::OffsetDateTime, } pub enum SessionAuthentication { @@ -151,23 +148,21 @@ impl TaskAuth { &mut self, app: &App, ) -> anyhow::Result<&flow_client::Client> { - if self.claims.time_remaining().whole_seconds() < 60 { + if (self.exp - time::OffsetDateTime::now_utc()).whole_seconds() < 60 { let (client, claims) = fetch_control_plane_authorization( self.client.clone(), models::authorizations::AllowedRole::Dekaf, - &dekaf_shard_template_id(&self.name), + &dekaf_shard_template_id(&self.task_name), &app.data_plane_fqdn, &app.data_plane_signer, - proto_flow::capability::AUTHORIZE & proto_gazette::capability::READ, - gazette::broker::LabelSelector { - include: Some(labels::build_set([(labels::TASK_NAME, self.name.as_str())])), - exclude: None, - }, + proto_flow::capability::AUTHORIZE, + Default::default(), ) .await?; self.client = client.with_fresh_gazette_client(); - self.claims = claims; + self.exp = + time::OffsetDateTime::UNIX_EPOCH + time::Duration::seconds(claims.exp as i64); } Ok(&self.client) @@ -197,11 +192,8 @@ impl App { &dekaf_shard_template_id(&username), &self.data_plane_fqdn, &self.data_plane_signer, - proto_flow::capability::AUTHORIZE & proto_gazette::capability::READ, - gazette::broker::LabelSelector { - include: Some(labels::build_set([(labels::TASK_NAME, username.as_str())])), - exclude: None, - }, + proto_flow::capability::AUTHORIZE, + Default::default(), ) .await?; @@ -220,11 +212,11 @@ impl App { } Ok(SessionAuthentication::Task(TaskAuth { - name: username, + task_name: username, config, bindings: task_spec.bindings.clone(), client, - claims, + exp: time::OffsetDateTime::UNIX_EPOCH + time::Duration::seconds(claims.exp as i64), })) } else if username.contains("{") { let raw_token = String::from_utf8(base64::decode(password)?.to_vec())?; @@ -596,7 +588,7 @@ fn decode_safe_name(safe_name: String) -> anyhow::Result { /// used throughout the data-plane. Dekaf materializations, on the other hand, have predictable /// shard template IDs since they never publish shards whose names could conflict. fn dekaf_shard_template_id(task_name: &str) -> String { - format!("materialize/{task_name}/0000000000000000") + format!("materialize/{task_name}/0000000000000000/") } /// Modified from [this](https://github.com/serde-rs/serde/issues/368#issuecomment-1579475447) diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 733e99802b..190b01d9fd 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -6,7 +6,9 @@ use axum_server::tls_rustls::RustlsConfig; use clap::{Args, Parser}; use dekaf::{KafkaApiClient, Session}; use flow_client::{ - DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, LOCAL_PG_PUBLIC_TOKEN, LOCAL_PG_URL, + DEFAULT_AGENT_URL, DEFAULT_DATA_PLANE_FQDN, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, + LOCAL_AGENT_URL, LOCAL_DATA_PLANE_FQDN, LOCAL_DATA_PLANE_HMAC, LOCAL_PG_PUBLIC_TOKEN, + LOCAL_PG_URL, }; use futures::{FutureExt, TryStreamExt}; use rsasl::config::SASLConfig; @@ -29,6 +31,7 @@ pub struct Cli { #[arg( long, default_value = DEFAULT_PG_URL.as_str(), + default_value_if("local", "true", Some(LOCAL_PG_URL.as_str())), env = "API_ENDPOINT" )] api_endpoint: Url, @@ -36,13 +39,22 @@ pub struct Cli { #[arg( long, default_value = DEFAULT_PG_PUBLIC_TOKEN, + default_value_if("local", "true", Some(LOCAL_PG_PUBLIC_TOKEN)), env = "API_KEY" )] api_key: String, + /// Endpoint of the Estuary agent API to use. + #[arg( + long, + default_value = DEFAULT_AGENT_URL.as_str(), + default_value_if("local", "true", Some(LOCAL_AGENT_URL.as_str())), + env = "AGENT_ENDPOINT" + )] + agent_endpoint: Url, /// When true, override the configured API endpoint and token, /// in preference of a local control plane. - #[arg(long)] + #[arg(long, action(clap::ArgAction::SetTrue))] local: bool, /// The hostname to advertise when enumerating Kafka "brokers". /// This is the hostname at which `dekaf` may be accessed. @@ -87,11 +99,25 @@ pub struct Cli { idle_session_timeout: std::time::Duration, /// The fully-qualified domain name of the data plane that Dekaf is running inside of - #[arg(long, env = "DATA_PLANE_FQDN")] + #[arg( + long, + env = "DATA_PLANE_FQDN", + default_value=DEFAULT_DATA_PLANE_FQDN, + default_value_if("local", "true", Some(LOCAL_DATA_PLANE_FQDN)), + )] data_plane_fqdn: String, /// An HMAC key recognized by the data plane that Dekaf is running inside of. Used to /// sign data-plane access token requests. - #[arg(long, env = "DATA_PLANE_ACCESS_KEY")] + #[arg( + long, + env = "DATA_PLANE_ACCESS_KEY", + default_value_if("local", "true", Some(LOCAL_DATA_PLANE_HMAC)), + // This is a work-around to clap_derive's somewhat buggy handling of `default_value_if`. + // The end result is that `data_plane_access_key` is required iff `--local` is not specified. + // If --local is specified, it will use the value in LOCAL_DATA_PLANE_HMAC unless overridden. + // See https://github.com/clap-rs/clap/issues/4086 and https://github.com/clap-rs/clap/issues/4918 + required(false) + )] data_plane_access_key: String, #[command(flatten)] @@ -125,12 +151,6 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); tracing::info!("Starting dekaf"); - let (api_endpoint, api_key) = if cli.local { - (LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string()) - } else { - (cli.api_endpoint, cli.api_key) - }; - let upstream_kafka_host = format!( "tcp://{}:{}", cli.default_broker_hostname, cli.default_broker_port @@ -154,9 +174,9 @@ async fn main() -> anyhow::Result<()> { data_plane_signer: jsonwebtoken::EncodingKey::from_base64_secret(&cli.data_plane_access_key)?, data_plane_fqdn: cli.data_plane_fqdn, client_base: flow_client::Client::new( - DEFAULT_AGENT_URL.to_owned(), - api_key, - api_endpoint, + cli.agent_endpoint, + cli.api_key, + cli.api_endpoint, None, ) }); diff --git a/crates/dekaf/src/registry.rs b/crates/dekaf/src/registry.rs index f5708c8062..78cd9d825f 100644 --- a/crates/dekaf/src/registry.rs +++ b/crates/dekaf/src/registry.rs @@ -1,7 +1,5 @@ use super::App; -use crate::{ - from_downstream_topic_name, to_downstream_topic_name, topology, SessionAuthentication, -}; +use crate::{from_downstream_topic_name, to_downstream_topic_name, SessionAuthentication}; use anyhow::Context; use axum::response::{IntoResponse, Response}; use axum_extra::headers; diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 57294d75d1..16afe58651 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -1,11 +1,12 @@ use crate::{ - connector::{DekafConfig, DeletionMode}, + connector::{DekafConfig, DekafResourceConfig, DeletionMode}, dekaf_shard_template_id, App, SessionAuthentication, TaskAuth, UserAuth, }; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use gazette::{broker, journal, uuid}; -use models::Materialization; +use itertools::Itertools; +use models::{Materialization, MaterializationBinding}; use proto_flow::flow; use std::time::Duration; @@ -38,8 +39,32 @@ impl TaskAuth { Ok(self .bindings .iter() - .map(|b| b.source.collection().to_string()) - .collect()) + .map(|b| { + serde_json::from_value::( + b.resource.to_value(), + ) + }) + .map_ok(|val| val.topic_name) + .collect::, _>>()?) + } + + pub fn get_binding_for_topic( + &self, + topic_name: &str, + ) -> anyhow::Result> { + Ok(self + .bindings + .iter() + .map(|b| { + serde_json::from_value::( + b.resource.to_value(), + ) + .map(|parsed| (b, parsed)) + }) + .collect::, _>>()? + .into_iter() + .find(|(_, parsed_config)| parsed_config.topic_name == topic_name) + .map(|(binding, config)| (binding.clone(), config))) } } @@ -50,6 +75,19 @@ impl SessionAuthentication { SessionAuthentication::Task(auth) => auth.fetch_all_collection_names().await, } } + + pub fn get_collection_for_topic(&self, topic_name: &str) -> anyhow::Result { + match self { + SessionAuthentication::User(_) => Ok(topic_name.to_string()), + SessionAuthentication::Task(auth) => { + let (binding, _resource_config) = auth + .get_binding_for_topic(topic_name)? + .ok_or(anyhow::anyhow!("Unrecognized topic {topic_name}"))?; + + Ok(binding.source.collection().to_string()) + } + } + } } pub async fn get_dekaf_materialization( @@ -59,30 +97,28 @@ pub async fn get_dekaf_materialization( #[derive(serde::Deserialize)] struct Row { spec_type: models::CatalogType, - spec: String, + spec: models::MaterializationDef, } - let row = client - .from("live_specs") - .eq("catalog_name", name.clone()) - .select("spec, spec_type") - .single() - .execute() - .await? - .json::() - .await?; + let row: Row = handle_postgrest_response( + client + .from("live_specs") + .eq("catalog_name", name.clone()) + .select("spec, spec_type") + .single(), + ) + .await + .context("fetching materialization spec")?; if !matches!(row.spec_type, models::CatalogType::Materialization) { bail!("Unexpected spec type {}", row.spec_type); } else { - let parsed_spec = serde_json::from_str::(&row.spec)?; - - match &parsed_spec.endpoint { + match &row.spec.endpoint { models::MaterializationEndpoint::Dekaf(dekaf_endpoint) => { let decrypted = runtime::unseal::decrypt_sops(&dekaf_endpoint.config).await?; let dekaf_config = serde_json::from_value::(decrypted.to_value())?; - Ok((parsed_spec, dekaf_config)) + Ok((row.spec, dekaf_config)) } models::MaterializationEndpoint::Connector(_) | models::MaterializationEndpoint::Local(_) => { @@ -105,6 +141,7 @@ pub struct Collection { } /// Partition is a collection journal which is mapped into a stable Kafka partition order. +#[derive(Debug)] pub struct Partition { pub create_revision: i64, pub spec: broker::JournalSpec, @@ -127,40 +164,43 @@ impl Collection { app: &App, auth: &SessionAuthentication, pg_client: &postgrest::Postgrest, - collection: &str, + topic_name: &str, ) -> anyhow::Result> { let not_before = uuid::Clock::default(); - if let SessionAuthentication::Task(TaskAuth { - ref bindings, - ref name, - .. - }) = auth - { - if let Some(binding) = bindings - .iter() - .find(|b| b.source.collection().as_str() == collection) - { + if let SessionAuthentication::Task(task_auth) = auth { + if let Some((binding, _)) = task_auth.get_binding_for_topic(topic_name)? { if binding.disable { - bail!("Binding {collection} is disabled in {name}") + bail!( + "Binding for topic {topic_name} is disabled in {}", + task_auth.task_name + ) } } else { - bail!("Collection {collection} is not a binding of {name}") + bail!( + "Unable to find binding for topic {topic_name} is not a binding of {}", + task_auth.task_name + ) } } - // Build a journal client and use it to fetch partitions while concurrently - // fetching the collection's metadata from the control plane. - let client_partitions = async { - let journal_client = Self::build_journal_client(app, collection).await?; - let partitions = Self::fetch_partitions(&journal_client, collection).await?; - Ok((journal_client, partitions)) + let collection_name = &auth.get_collection_for_topic(topic_name)?; + + let Some(spec) = Self::fetch_spec(&pg_client, collection_name).await? else { + return Ok(None); }; - let (spec, client_partitions): (anyhow::Result<_>, anyhow::Result<_>) = - futures::join!(Self::fetch_spec(&pg_client, collection), client_partitions); + let partition_template_name = spec + .partition_template + .as_ref() + .map(|spec| spec.name.to_owned()) + .ok_or(anyhow!("missing partition template"))?; + + let journal_client = + Self::build_journal_client(app, &auth, collection_name, &partition_template_name) + .await?; + let partitions = Self::fetch_partitions(&journal_client, collection_name).await?; - let Some(spec) = spec? else { return Ok(None) }; - let (journal_client, partitions) = client_partitions?; + tracing::debug!(?partitions, "Got partitions"); let key_ptr: Vec = spec.key.iter().map(|p| doc::Pointer::from_str(p)).collect(); @@ -183,7 +223,7 @@ impl Collection { let (key_schema, value_schema) = avro::shape_to_avro(shape, &key_ptr); tracing::debug!( - collection, + collection_name, partitions = partitions.len(), "built collection" ); @@ -224,17 +264,15 @@ impl Collection { built_spec: flow::CollectionSpec, } - let mut rows: Vec = client - .from("live_specs_ext") - .eq("spec_type", "collection") - .eq("catalog_name", collection) - .select("built_spec") - .execute() - .await - .and_then(|r| r.error_for_status()) - .context("listing current collection specifications")? - .json() - .await?; + let mut rows: Vec = handle_postgrest_response( + client + .from("live_specs_ext") + .eq("spec_type", "collection") + .eq("catalog_name", collection) + .select("built_spec"), + ) + .await + .context("listing current collection specifications")?; if let Some(Row { built_spec }) = rows.pop() { Ok(Some(built_spec)) @@ -244,6 +282,7 @@ impl Collection { } /// Fetch the journals of a collection and map into stable-order partitions. + #[tracing::instrument(skip(journal_client))] async fn fetch_partitions( journal_client: &journal::Client, collection: &str, @@ -255,11 +294,14 @@ impl Collection { }), ..Default::default() }; + + tracing::debug!(?request, "Fetching partitions"); let response = tokio::time::timeout(Duration::from_secs(5), journal_client.list(request)) .await .map_err(|e| { anyhow::anyhow!("timed out fetching partitions for {collection}: {e}") })??; + tracing::debug!(?response, "Fetched partitions"); let mut partitions = Vec::with_capacity(response.journals.len()); for journal in response.journals { @@ -346,25 +388,56 @@ impl Collection { } /// Build a journal client by resolving the collections data-plane gateway and an access token. - async fn build_journal_client(app: &App, collection: &str) -> anyhow::Result { - let (_ops_logs_journal, _ops_stats_journal, journal_client) = tokio::time::timeout( - Duration::from_secs(5), - flow_client::client::fetch_task_authorization( - &app.client_base, - &dekaf_shard_template_id(collection), - &app.data_plane_fqdn, - &app.data_plane_signer, - proto_flow::capability::AUTHORIZE & proto_gazette::capability::READ, - gazette::broker::LabelSelector { - include: Some(labels::build_set([(labels::TASK_NAME, collection)])), - exclude: None, - }, - ), - ) - .map_err(|e| anyhow::anyhow!("timed out building journal client for {collection}: {e}")) - .await??; - - Ok(journal_client) + async fn build_journal_client( + app: &App, + auth: &SessionAuthentication, + collection_name: &str, + partition_template_name: &str, + ) -> anyhow::Result { + match auth { + SessionAuthentication::User(user_auth) => { + let (_, journal_client) = tokio::time::timeout( + Duration::from_secs(5), + flow_client::fetch_user_collection_authorization( + &user_auth.client, + collection_name, + ), + ) + .map_err(|e| { + anyhow::anyhow!("timed out building journal client for {collection_name}: {e}") + }) + .await??; + + Ok(journal_client) + } + SessionAuthentication::Task(task_auth) => { + let (_ops_logs_journal, _ops_stats_journal, journal_client) = tokio::time::timeout( + Duration::from_secs(30), + flow_client::client::fetch_task_authorization( + &app.client_base, + &dekaf_shard_template_id(&task_auth.task_name), + &app.data_plane_fqdn, + &app.data_plane_signer, + proto_flow::capability::AUTHORIZE + | proto_gazette::capability::LIST + | proto_gazette::capability::READ, + gazette::broker::LabelSelector { + include: Some(labels::build_set([( + "name:prefix", + format!("{partition_template_name}/").as_str(), + )])), + exclude: None, + }, + ), + ) + .map_err(|e| { + anyhow::anyhow!("timed out building journal client for {collection_name}: {e}") + }) + .await??; + + Ok(journal_client) + } + } } async fn registered_schema_id( @@ -384,36 +457,30 @@ impl Collection { let schema: serde_json::Value = serde_json::from_str(&schema.canonical_form()).unwrap(); let schema_md5 = format!("{:x}", md5::compute(&schema.to_string())); - let mut rows: Vec = client - .from("registered_avro_schemas") - .eq("avro_schema_md5", &schema_md5) - .select("registry_id") - .execute() - .await - .and_then(|r| r.error_for_status()) - .context("querying for an already-registered schema")? - .json() - .await?; + let mut rows: Vec = handle_postgrest_response( + client + .from("registered_avro_schemas") + .eq("avro_schema_md5", &schema_md5) + .select("registry_id"), + ) + .await + .context("querying for an already-registered schema")?; if let Some(Row { registry_id }) = rows.pop() { return Ok(registry_id); } - let mut rows: Vec = client - .from("registered_avro_schemas") - .insert( + let mut rows: Vec = handle_postgrest_response( + client.from("registered_avro_schemas").insert( serde_json::json!([{ "avro_schema": schema, "catalog_name": catalog_name, }]) .to_string(), - ) - .execute() - .await - .and_then(|r| r.error_for_status()) - .context("inserting new registered schema")? - .json() - .await?; + ), + ) + .await + .context("inserting new registered schema")?; let registry_id = rows.pop().unwrap().registry_id; tracing::info!(schema_md5, registry_id, "registered new Avro schema"); @@ -421,3 +488,20 @@ impl Collection { Ok(registry_id) } } + +async fn handle_postgrest_response( + builder: postgrest::Builder, +) -> anyhow::Result { + let resp = builder.execute().await?; + let status = resp.status(); + + if status.is_client_error() || status.is_server_error() { + bail!( + "{}: {}", + status.canonical_reason().unwrap_or(status.as_str()), + resp.text().await? + ) + } else { + Ok(resp.json().await?) + } +} diff --git a/crates/flow-client/src/lib.rs b/crates/flow-client/src/lib.rs index 38a870e07f..5aed64c3e3 100644 --- a/crates/flow-client/src/lib.rs +++ b/crates/flow-client/src/lib.rs @@ -66,3 +66,6 @@ lazy_static::lazy_static! { pub const DEFAULT_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImV5cmNubXV6enlyaXlwZGFqd2RrIiwicm9sZSI6ImFub24iLCJpYXQiOjE2NDg3NTA1NzksImV4cCI6MTk2NDMyNjU3OX0.y1OyXD3-DYMz10eGxzo1eeamVMMUwIIeOoMryTRAoco"; pub const LOCAL_PG_PUBLIC_TOKEN: &str = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0"; +pub const LOCAL_DATA_PLANE_HMAC: &str = "c3VwZXJzZWNyZXQ="; +pub const LOCAL_DATA_PLANE_FQDN: &str = "local-cluster.dp.estuary-data.com"; +pub const DEFAULT_DATA_PLANE_FQDN: &str = "gcp-us-central1-c1.dp.estuary-data.com";