diff --git a/crates/agent/src/api/authorize_dekaf.rs b/crates/agent/src/api/authorize_dekaf.rs new file mode 100644 index 0000000000..4dd759fd67 --- /dev/null +++ b/crates/agent/src/api/authorize_dekaf.rs @@ -0,0 +1,197 @@ +use super::App; +use crate::api::snapshot::Snapshot; +use anyhow::Context; +use models::CatalogType; +use std::sync::Arc; + +type Request = models::authorizations::TaskAuthorizationRequest; +type Response = models::authorizations::DekafAuthResponse; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct AccessTokenClaims { + exp: u64, + iat: u64, + role: String, +} + +#[axum::debug_handler] +pub async fn authorize_dekaf( + axum::extract::State(app): axum::extract::State>, + axum::Json(request): axum::Json, +) -> axum::response::Response { + super::wrap(async move { do_authorize_dekaf(&app, &request).await }).await +} + +const DEKAF_ROLE: &str = "dekaf"; + +/// Dekaf straddles the control-plane and data-plane: +/// * It needs full control over the `registered_avro_schemas` table in the control-plane +/// in order to serve its schema registry functionality +/// * It needs access to the full specs of materializations both in order to authenticate +/// sessions, as well as figure out which bindings to expose under what names, etc. +/// * It needs to read from journals in order to serve topic data to consumers +/// * It needs to write to ops logs and stats for observability and billing concerns +/// +/// This endpoint provides it a way to do all of these things, while also staying within +/// the authorization framework used by other data-plane actors. +/// +/// Specifically, this checks that: +/// * Your request is coming from an authorized actor in a data-plane, +/// * That actor is acting on behalf of a task running in that same data-plane. +/// +/// Once we've authenticated and authorized the request as best we can, we put together +/// a package of all the information Dekaf needs in one place: +/// * A short-lived control-plane access token to authorize requests under the `dekaf` role +/// which has grants to the `public.registered_avro_schemas` table +/// * The `models::MaterializationDef` for the materialization being requested +/// as identified by the `sub` JWT claim +/// * +#[tracing::instrument(skip(app), err(level = tracing::Level::WARN))] +async fn do_authorize_dekaf(app: &App, Request { token }: &Request) -> anyhow::Result { + let jsonwebtoken::TokenData { header, claims }: jsonwebtoken::TokenData = + { + // In this pass we do not validate the signature, + // because we don't yet know which data-plane the JWT is signed by. + let empty_key = jsonwebtoken::DecodingKey::from_secret(&[]); + let mut validation = jsonwebtoken::Validation::default(); + validation.insecure_disable_signature_validation(); + jsonwebtoken::decode(token, &empty_key, &validation) + }?; + tracing::debug!(?claims, ?header, "decoded authorization request"); + + let task_name = claims.sub.as_str(); + if task_name.is_empty() { + anyhow::bail!("missing required materialization name (`sub` claim)"); + } + + let shard_data_plane_fqdn = claims.iss.as_str(); + if shard_data_plane_fqdn.is_empty() { + anyhow::bail!("missing required task data-plane FQDN (`iss` claim)"); + } + + if claims.cap != proto_flow::capability::AUTHORIZE { + anyhow::bail!("invalid capability, must be AUTHORIZE only: {}", claims.cap); + } + + match Snapshot::evaluate(&app.snapshot, claims.iat, |snapshot: &Snapshot| { + evaluate_authorization(snapshot, task_name, shard_data_plane_fqdn, token) + }) { + Ok((ops_logs_journal, ops_stats_journal)) => { + let materialization_spec = sqlx::query!( + r#" + select + spec as "spec!: sqlx::types::Json", + spec_type as "spec_type!: models::CatalogType" + from live_specs + where live_specs.catalog_name = $1 + "#, + task_name + ) + .fetch_one(&app.pg_pool) + .await + .context("failed to fetch task spec")?; + + if !matches!( + materialization_spec.spec_type, + models::CatalogType::Materialization + ) { + anyhow::bail!("Unexpected spec type {}", materialization_spec.spec_type); + } + + let unix_ts = jsonwebtoken::get_current_timestamp(); + let claims = AccessTokenClaims { + iat: unix_ts, + exp: unix_ts + (60 * 60), + role: DEKAF_ROLE.to_string(), + }; + + let signed = jsonwebtoken::encode( + &jsonwebtoken::Header::default(), + &claims, + &app.control_plane_jwt_signer, + )?; + + Ok(Response { + token: signed, + ops_logs_journal, + ops_stats_journal, + task_spec: Some(materialization_spec.spec.0), + retry_millis: 0, + }) + } + Err(Ok(retry_millis)) => Ok(Response { + retry_millis, + ..Default::default() + }), + Err(Err(err)) => Err(err), + } +} + +fn evaluate_authorization( + snapshot: &Snapshot, + task_name: &str, + shard_data_plane_fqdn: &str, + token: &str, +) -> anyhow::Result<(String, String)> { + tracing::debug!(?task_name, "Task name"); + // Map `claims.sub`, a task name, into its task. + let task = snapshot.task_by_catalog_name(&models::Name::new(task_name.to_string())); + + // Map `claims.iss`, a data-plane FQDN, into its task-matched data-plane. + let task_data_plane = task.and_then(|task| { + snapshot + .data_planes + .get_by_key(&task.data_plane_id) + .filter(|data_plane| data_plane.data_plane_fqdn == shard_data_plane_fqdn) + }); + + let (Some(task), Some(task_data_plane)) = (task, task_data_plane) else { + tracing::debug!( + ?task, + ?task_data_plane, + ?snapshot.tasks, + "failed to find matching task in data plane" + ); + anyhow::bail!("task {task_name} within data-plane {shard_data_plane_fqdn} is not known") + }; + + if task.spec_type != CatalogType::Materialization { + anyhow::bail!( + "task {task_name} must be a materialization, but is {:?} instead", + task.spec_type + ) + } + + // Attempt to find an HMAC key of this data-plane which validates against the request token. + let validation = jsonwebtoken::Validation::default(); + let mut verified = false; + + for hmac_key in &task_data_plane.hmac_keys { + let key = jsonwebtoken::DecodingKey::from_base64_secret(hmac_key) + .context("invalid data-plane hmac key")?; + + if jsonwebtoken::decode::(token, &key, &validation).is_ok() { + verified = true; + break; + } + } + if !verified { + anyhow::bail!("no data-plane keys validated against the token signature"); + } + + let (Some(ops_logs), Some(ops_stats)) = ( + snapshot.collection_by_catalog_name(&task_data_plane.ops_logs_name), + snapshot.collection_by_catalog_name(&task_data_plane.ops_stats_name), + ) else { + anyhow::bail!( + "couldn't resolve data-plane {} ops collections", + task.data_plane_id + ) + }; + + let ops_suffix = super::ops_suffix(task); + let ops_logs_journal = format!("{}{}", ops_logs.journal_template_name, &ops_suffix[1..]); + let ops_stats_journal = format!("{}{}", ops_stats.journal_template_name, &ops_suffix[1..]); + + Ok((ops_logs_journal, ops_stats_journal)) +} diff --git a/crates/agent/src/api/authorize_role.rs b/crates/agent/src/api/authorize_role.rs deleted file mode 100644 index 9029b9bffc..0000000000 --- a/crates/agent/src/api/authorize_role.rs +++ /dev/null @@ -1,158 +0,0 @@ -use super::App; -use crate::api::snapshot::Snapshot; -use anyhow::Context; -use std::sync::Arc; - -type Request = models::authorizations::TaskAuthorizationRequest; -type Response = models::authorizations::RoleAuthorization; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct AccessTokenClaims { - exp: u64, - iat: u64, - role: String, -} - -#[axum::debug_handler] -pub async fn authorize_role( - axum::extract::State(app): axum::extract::State>, - axum::extract::Path(role): axum::extract::Path, - axum::Json(request): axum::Json, -) -> axum::response::Response { - super::wrap(async move { do_authorize_role(&app, role, &request).await }).await -} - -/// Provides a way for data-plane components to get access to the control-plane. -/// -/// Specifically, this checks that: -/// * Your request is coming from an authorized actor in a data-plane, -/// * That actor is acting on behalf of a task running in that same data-plane. -/// It shares most of its logic with `/authorize/task`, except that instead of -/// authorizing access to a particular collection via a task, it simply validates -/// that the specified task is running in the same data-plane as the requesting actor, -/// and then returns a control-plane access token authorized to act as the requested role. -/// -/// NOTE: Instead of authorizing access to a role which then has access to particular -/// tables in the control plane, ideally this endpoint would return a signed token -/// that includes the specific catalog name it's authorized for. This would require control-plane -/// support for catalog-prefix-scoped tokens which we don't have at the moment, so instead we use roles. -#[tracing::instrument(skip(app), err(level = tracing::Level::WARN))] -async fn do_authorize_role( - app: &App, - role: models::authorizations::AllowedRole, - Request { token }: &Request, -) -> anyhow::Result { - let jsonwebtoken::TokenData { header, claims }: jsonwebtoken::TokenData = - { - // In this pass we do not validate the signature, - // because we don't yet know which data-plane the JWT is signed by. - let empty_key = jsonwebtoken::DecodingKey::from_secret(&[]); - let mut validation = jsonwebtoken::Validation::default(); - validation.insecure_disable_signature_validation(); - jsonwebtoken::decode(token, &empty_key, &validation) - }?; - tracing::debug!(?claims, ?header, "decoded authorization request"); - - let shard_id = claims.sub.as_str(); - if shard_id.is_empty() { - anyhow::bail!("missing required shard ID (`sub` claim)"); - } - - let shard_data_plane_fqdn = claims.iss.as_str(); - if shard_data_plane_fqdn.is_empty() { - anyhow::bail!("missing required shard data-plane FQDN (`iss` claim)"); - } - - if claims.cap & proto_flow::capability::AUTHORIZE == 0 { - anyhow::bail!("missing required AUTHORIZE capability: {}", claims.cap); - } - - match Snapshot::evaluate(&app.snapshot, claims.iat, |snapshot: &Snapshot| { - evaluate_authorization(snapshot, shard_id, shard_data_plane_fqdn, token) - }) { - Ok(()) => { - let unix_ts = jsonwebtoken::get_current_timestamp(); - let claims = AccessTokenClaims { - iat: unix_ts, - exp: unix_ts + (60 * 60), - role: role.to_string(), - }; - - let signed = jsonwebtoken::encode( - &jsonwebtoken::Header::default(), - &claims, - &app.control_plane_jwt_signer, - )?; - - Ok(Response { - token: signed, - retry_millis: 0, - }) - } - Err(Ok(retry_millis)) => Ok(Response { - retry_millis, - ..Default::default() - }), - Err(Err(err)) => Err(err), - } -} - -fn evaluate_authorization( - snapshot: &Snapshot, - shard_id: &str, - shard_data_plane_fqdn: &str, - token: &str, -) -> anyhow::Result<()> { - let Snapshot { - data_planes, tasks, .. - } = snapshot; - // Map `claims.sub`, a Shard ID, into its task. - let task = tasks - .binary_search_by(|task| { - if shard_id.starts_with(&task.shard_template_id) { - std::cmp::Ordering::Equal - } else { - task.shard_template_id.as_str().cmp(shard_id) - } - }) - .ok() - .map(|index| &tasks[index]); - - // Map `claims.iss`, a data-plane FQDN, into its task-matched data-plane. - let task_data_plane = task.and_then(|task| { - data_planes - .get_by_key(&task.data_plane_id) - .filter(|data_plane| data_plane.data_plane_fqdn == shard_data_plane_fqdn) - }); - - let (Some(_), Some(task_data_plane)) = (task, task_data_plane) else { - tracing::debug!( - ?task, - ?task_data_plane, - ?tasks, - "failed to find matching task in data plane" - ); - anyhow::bail!( - "task shard {shard_id} within data-plane {shard_data_plane_fqdn} is not known" - ) - }; - - // Attempt to find an HMAC key of this data-plane which validates against the request token. - let validation = jsonwebtoken::Validation::default(); - let mut verified = false; - - for hmac_key in &task_data_plane.hmac_keys { - let key = jsonwebtoken::DecodingKey::from_base64_secret(hmac_key) - .context("invalid data-plane hmac key")?; - - if jsonwebtoken::decode::(token, &key, &validation).is_ok() { - verified = true; - break; - } - } - if !verified { - anyhow::bail!("no data-plane keys validated against the token signature"); - } - - Ok(()) -} diff --git a/crates/agent/src/api/authorize_task.rs b/crates/agent/src/api/authorize_task.rs index 8dbb5ef1bd..ad48644405 100644 --- a/crates/agent/src/api/authorize_task.rs +++ b/crates/agent/src/api/authorize_task.rs @@ -93,13 +93,7 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re required_role, ) }) { - Ok(( - encoding_key, - data_plane_fqdn, - broker_address, - ops_logs_journal, - ops_stats_journal, - )) => { + Ok((encoding_key, data_plane_fqdn, broker_address)) => { claims.iss = data_plane_fqdn; claims.exp = claims.iat + super::exp_seconds(); @@ -109,8 +103,6 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re Ok(Response { broker_address, token, - ops_logs_journal, - ops_stats_journal, ..Default::default() }) } @@ -129,7 +121,7 @@ fn evaluate_authorization( token: &str, journal_name_or_prefix: &str, required_role: models::Capability, -) -> anyhow::Result<(jsonwebtoken::EncodingKey, String, String, String, String)> { +) -> anyhow::Result<(jsonwebtoken::EncodingKey, String, String)> { let Snapshot { collections, data_planes, @@ -244,20 +236,6 @@ fn evaluate_authorization( }; let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key)?; - let (Some(ops_logs), Some(ops_stats)) = ( - snapshot.collection_by_catalog_name(&collection_data_plane.ops_logs_name), - snapshot.collection_by_catalog_name(&collection_data_plane.ops_stats_name), - ) else { - anyhow::bail!( - "couldn't resolve data-plane {} ops collections", - task.data_plane_id - ) - }; - - let ops_suffix = super::ops_suffix(task); - let ops_logs_journal = format!("{}{}", ops_logs.journal_template_name, &ops_suffix[1..]); - let ops_stats_journal = format!("{}{}", ops_stats.journal_template_name, &ops_suffix[1..]); - Ok(( encoding_key, collection_data_plane.data_plane_fqdn.clone(), @@ -265,7 +243,5 @@ fn evaluate_authorization( task.data_plane_id != collection.data_plane_id, &collection_data_plane.broker_address, ), - ops_logs_journal, - ops_stats_journal, )) } diff --git a/crates/agent/src/api/mod.rs b/crates/agent/src/api/mod.rs index 6b154a94e9..f63b40e5a1 100644 --- a/crates/agent/src/api/mod.rs +++ b/crates/agent/src/api/mod.rs @@ -1,7 +1,7 @@ use axum::{http::StatusCode, response::IntoResponse}; use std::sync::{Arc, Mutex}; -mod authorize_role; +mod authorize_dekaf; mod authorize_task; mod authorize_user_collection; mod authorize_user_task; @@ -104,10 +104,7 @@ pub fn build_router( let schema_router = axum::Router::new() .route("/authorize/task", post(authorize_task::authorize_task)) - .route( - "/authorize/role/:role_name", - post(authorize_role::authorize_role), - ) + .route("/authorize/dekaf", post(authorize_dekaf::authorize_dekaf)) .route( "/authorize/user/task", post(authorize_user_task::authorize_user_task) diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index 538df2de97..edc9e8e37a 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -157,7 +157,7 @@ pub async fn fetch_task_authorization( data_plane_signer: &jsonwebtoken::EncodingKey, capability: u32, selector: gazette::broker::LabelSelector, -) -> anyhow::Result<(String, String, gazette::journal::Client)> { +) -> anyhow::Result { let request_token = build_task_authorization_request_token( shard_template_id, data_plane_fqdn, @@ -169,8 +169,6 @@ pub async fn fetch_task_authorization( let models::authorizations::TaskAuthorization { broker_address, token, - ops_logs_journal, - ops_stats_journal, retry_millis: _, } = loop { let response: models::authorizations::TaskAuthorization = client @@ -202,7 +200,7 @@ pub async fn fetch_task_authorization( .journal_client .with_endpoint_and_metadata(broker_address, md); - Ok((ops_logs_journal, ops_stats_journal, journal_client)) + Ok(journal_client) } fn build_task_authorization_request_token( @@ -230,61 +228,6 @@ fn build_task_authorization_request_token( Ok(signed_request_token) } -// Claims returned by `/authorize/role` -#[derive(Debug, Clone, serde::Deserialize)] -pub struct RoleTokenClaims { - pub role: String, - pub iat: u64, - pub exp: u64, -} - -#[tracing::instrument(skip(client, data_plane_signer), err)] -pub async fn fetch_control_plane_authorization( - client: Client, - role: models::authorizations::AllowedRole, - shard_template_id: &str, - data_plane_fqdn: &str, - data_plane_signer: &jsonwebtoken::EncodingKey, - capability: u32, - selector: gazette::broker::LabelSelector, -) -> anyhow::Result<(Client, RoleTokenClaims)> { - let request_token = build_task_authorization_request_token( - shard_template_id, - data_plane_fqdn, - data_plane_signer, - capability, - selector, - )?; - - let models::authorizations::RoleAuthorization { - token, - retry_millis: _, - } = loop { - let response: models::authorizations::RoleAuthorization = client - .agent_unary( - format!("/authorize/role/{}", role.to_string()).as_str(), - &models::authorizations::TaskAuthorizationRequest { - token: request_token.clone(), - }, - ) - .await?; - - if response.retry_millis != 0 { - tracing::warn!( - secs = response.retry_millis as f64 / 1000.0, - "authorization service tentatively rejected our request, but will retry before failing" - ); - () = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await; - continue; - } - break response; - }; - - let claims = parse_jwt_claims(token.as_str())?; - - Ok((client.with_user_access_token(Some(token)), claims)) -} - #[tracing::instrument(skip(client), err)] pub async fn fetch_user_task_authorization( client: &Client, diff --git a/crates/flow-client/src/lib.rs b/crates/flow-client/src/lib.rs index 38a870e07f..a0cffd9873 100644 --- a/crates/flow-client/src/lib.rs +++ b/crates/flow-client/src/lib.rs @@ -2,8 +2,7 @@ use anyhow::Context; pub mod client; pub use client::{ - fetch_collection_authorization, fetch_control_plane_authorization, fetch_task_authorization, - fetch_user_task_authorization, Client, + fetch_collection_authorization, fetch_task_authorization, fetch_user_task_authorization, Client, }; pub mod pagination; diff --git a/crates/models/src/authorizations.rs b/crates/models/src/authorizations.rs index 471d60edad..016135cc5b 100644 --- a/crates/models/src/authorizations.rs +++ b/crates/models/src/authorizations.rs @@ -46,12 +46,6 @@ pub struct TaskAuthorization { /// # Address of Gazette brokers for the issued token. #[serde(default, skip_serializing_if = "String::is_empty")] pub broker_address: String, - // Name of the journal that contains the logs for the specified task - #[serde(default, skip_serializing_if = "String::is_empty")] - pub ops_logs_journal: String, - // Name of the journal that contains the stats for the specified task - #[serde(default, skip_serializing_if = "String::is_empty")] - pub ops_stats_journal: String, /// # Number of milliseconds to wait before retrying the request. /// Non-zero if and only if token is not set. pub retry_millis: u64, @@ -154,28 +148,21 @@ pub struct UserTaskAuthorization { #[serde(default, skip_serializing_if = "String::is_empty")] pub shard_id_prefix: String, } - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum AllowedRole { - #[serde(rename = "dekaf")] - Dekaf, -} - -impl fmt::Display for AllowedRole { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let role_str = match self { - AllowedRole::Dekaf => "dekaf", - }; - write!(f, "{}", role_str) - } -} - #[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] #[serde(rename_all = "camelCase")] -pub struct RoleAuthorization { +pub struct DekafAuthResponse { /// # Control plane access token with the requested role #[serde(default, skip_serializing_if = "String::is_empty")] pub token: String, + // Name of the journal that contains the logs for the specified task + #[serde(default, skip_serializing_if = "String::is_empty")] + pub ops_logs_journal: String, + // Name of the journal that contains the stats for the specified task + #[serde(default, skip_serializing_if = "String::is_empty")] + pub ops_stats_journal: String, + // Spec of the task + #[serde(default, skip_serializing_if = "Option::is_none")] + pub task_spec: Option, /// # Number of milliseconds to wait before retrying the request. /// Non-zero if and only if token is not set. pub retry_millis: u64,