Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
agent: Swap /authorize/role/:role for /authorize/dekaf
Browse files Browse the repository at this point in the history
In addition to avoiding premature generalization, we can now also return relevant metadata in one request, such as ops catalog names and task spec.
jshearer committed Nov 26, 2024

Verified

This commit was signed with the committer’s verified signature.
jschlyter Jakob Schlyter
1 parent 31b4243 commit 1c24d32
Showing 6 changed files with 84 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::App;
use crate::api::snapshot::Snapshot;
use anyhow::Context;
use models::CatalogType;
use std::sync::Arc;

type Request = models::authorizations::TaskAuthorizationRequest;
type Response = models::authorizations::RoleAuthorization;
type Response = models::authorizations::DekafAuthResponse;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct AccessTokenClaims {
@@ -14,14 +15,15 @@ struct AccessTokenClaims {
}

#[axum::debug_handler]
pub async fn authorize_role(
pub async fn authorize_dekaf(
axum::extract::State(app): axum::extract::State<Arc<App>>,
axum::extract::Path(role): axum::extract::Path<models::authorizations::AllowedRole>,
axum::Json(request): axum::Json<Request>,
) -> axum::response::Response {
super::wrap(async move { do_authorize_role(&app, role, &request).await }).await
super::wrap(async move { do_authorize_dekaf(&app, &request).await }).await
}

const DEKAF_ROLE: &str = "dekaf";

/// Provides a way for data-plane components to get access to the control-plane.
///
/// Specifically, this checks that:
@@ -37,11 +39,7 @@ pub async fn authorize_role(
/// that includes the specific catalog name it's authorized for. This would require control-plane
/// support for catalog-prefix-scoped tokens which we don't have at the moment, so instead we use roles.
#[tracing::instrument(skip(app), err(level = tracing::Level::WARN))]
async fn do_authorize_role(
app: &App,
role: models::authorizations::AllowedRole,
Request { token }: &Request,
) -> anyhow::Result<Response> {
async fn do_authorize_dekaf(app: &App, Request { token }: &Request) -> anyhow::Result<Response> {
let jsonwebtoken::TokenData { header, claims }: jsonwebtoken::TokenData<proto_gazette::Claims> =
{
// In this pass we do not validate the signature,
@@ -53,29 +51,50 @@ async fn do_authorize_role(
}?;
tracing::debug!(?claims, ?header, "decoded authorization request");

let shard_id = claims.sub.as_str();
if shard_id.is_empty() {
anyhow::bail!("missing required shard ID (`sub` claim)");
let task_name = claims.sub.as_str();
if task_name.is_empty() {
anyhow::bail!("missing required materialization name (`sub` claim)");
}

let shard_data_plane_fqdn = claims.iss.as_str();
if shard_data_plane_fqdn.is_empty() {
anyhow::bail!("missing required shard data-plane FQDN (`iss` claim)");
anyhow::bail!("missing required task data-plane FQDN (`iss` claim)");
}

if claims.cap & proto_flow::capability::AUTHORIZE == 0 {
anyhow::bail!("missing required AUTHORIZE capability: {}", claims.cap);
if claims.cap != proto_flow::capability::AUTHORIZE {
anyhow::bail!("invalid capability, must be AUTHORIZE only: {}", claims.cap);
}

match Snapshot::evaluate(&app.snapshot, claims.iat, |snapshot: &Snapshot| {
evaluate_authorization(snapshot, shard_id, shard_data_plane_fqdn, token)
evaluate_authorization(snapshot, task_name, shard_data_plane_fqdn, token)
}) {
Ok(()) => {
Ok((ops_logs_journal, ops_stats_journal)) => {
let materialization_spec = sqlx::query!(
r#"
select
spec as "spec!: models::MaterializationDef",
spec_type as "spec_type!: models::CatalogType"
from live_specs
where live_specs.catalog_name = $1
"#,
task_name
)
.fetch_one(app.pg_pool)
.await
.context("failed to fetch task spec")?;

if !matches!(
materialization_spec.spec_type,
models::CatalogType::Materialization
) {
bail!("Unexpected spec type {}", row.spec_type);
}

let unix_ts = jsonwebtoken::get_current_timestamp();
let claims = AccessTokenClaims {
iat: unix_ts,
exp: unix_ts + (60 * 60),
role: role.to_string(),
role: DEKAF_ROLE.to_string(),
};

let signed = jsonwebtoken::encode(
@@ -86,6 +105,9 @@ async fn do_authorize_role(

Ok(Response {
token: signed,
ops_logs_journal,
ops_stats_journal,
task_spec: Some(materialization_spec.spec),
retry_millis: 0,
})
}
@@ -99,20 +121,20 @@ async fn do_authorize_role(

fn evaluate_authorization(
snapshot: &Snapshot,
shard_id: &str,
task_name: &str,
shard_data_plane_fqdn: &str,
token: &str,
) -> anyhow::Result<()> {
let Snapshot {
data_planes, tasks, ..
} = snapshot;
// Map `claims.sub`, a Shard ID, into its task.
// Map `claims.sub`, a task name, into its task.
let task = tasks
.binary_search_by(|task| {
if shard_id.starts_with(&task.shard_template_id) {
if task_name.starts_with(&task.task_name) {
std::cmp::Ordering::Equal
} else {
task.shard_template_id.as_str().cmp(shard_id)
task.task_name.as_str().cmp(task_name)
}
})
.ok()
@@ -125,17 +147,22 @@ fn evaluate_authorization(
.filter(|data_plane| data_plane.data_plane_fqdn == shard_data_plane_fqdn)
});

let (Some(_), Some(task_data_plane)) = (task, task_data_plane) else {
let (Some(task), Some(task_data_plane)) = (task, task_data_plane) else {
tracing::debug!(
?task,
?task_data_plane,
?tasks,
"failed to find matching task in data plane"
);
anyhow::bail!("task {task_name} within data-plane {shard_data_plane_fqdn} is not known")
};

if task.spec_type != CatalogType::Materialization {
anyhow::bail!(
"task shard {shard_id} within data-plane {shard_data_plane_fqdn} is not known"
"task {task_name} must be a materialization, but is {:?} instead",
task.spec_type
)
};
}

// Attempt to find an HMAC key of this data-plane which validates against the request token.
let validation = jsonwebtoken::Validation::default();
@@ -154,5 +181,19 @@ fn evaluate_authorization(
anyhow::bail!("no data-plane keys validated against the token signature");
}

Ok(())
let (Some(ops_logs), Some(ops_stats)) = (
snapshot.collection_by_catalog_name(&task_data_plane.ops_logs_name),
snapshot.collection_by_catalog_name(&task_data_plane.ops_stats_name),
) else {
anyhow::bail!(
"couldn't resolve data-plane {} ops collections",
task.data_plane_id
)
};

let ops_suffix = super::ops_suffix(task);
let ops_logs_journal = format!("{}{}", ops_logs.journal_template_name, &ops_suffix[1..]);
let ops_stats_journal = format!("{}{}", ops_stats.journal_template_name, &ops_suffix[1..]);

Ok((ops_logs_journal, ops_stats_journal))
}
28 changes: 2 additions & 26 deletions crates/agent/src/api/authorize_task.rs
Original file line number Diff line number Diff line change
@@ -93,13 +93,7 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re
required_role,
)
}) {
Ok((
encoding_key,
data_plane_fqdn,
broker_address,
ops_logs_journal,
ops_stats_journal,
)) => {
Ok((encoding_key, data_plane_fqdn, broker_address)) => {
claims.iss = data_plane_fqdn;
claims.exp = claims.iat + super::exp_seconds();

@@ -109,8 +103,6 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re
Ok(Response {
broker_address,
token,
ops_logs_journal,
ops_stats_journal,
..Default::default()
})
}
@@ -129,7 +121,7 @@ fn evaluate_authorization(
token: &str,
journal_name_or_prefix: &str,
required_role: models::Capability,
) -> anyhow::Result<(jsonwebtoken::EncodingKey, String, String, String, String)> {
) -> anyhow::Result<(jsonwebtoken::EncodingKey, String, String)> {
let Snapshot {
collections,
data_planes,
@@ -244,28 +236,12 @@ fn evaluate_authorization(
};
let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key)?;

let (Some(ops_logs), Some(ops_stats)) = (
snapshot.collection_by_catalog_name(&collection_data_plane.ops_logs_name),
snapshot.collection_by_catalog_name(&collection_data_plane.ops_stats_name),
) else {
anyhow::bail!(
"couldn't resolve data-plane {} ops collections",
task.data_plane_id
)
};

let ops_suffix = super::ops_suffix(task);
let ops_logs_journal = format!("{}{}", ops_logs.journal_template_name, &ops_suffix[1..]);
let ops_stats_journal = format!("{}{}", ops_stats.journal_template_name, &ops_suffix[1..]);

Ok((
encoding_key,
collection_data_plane.data_plane_fqdn.clone(),
super::maybe_rewrite_address(
task.data_plane_id != collection.data_plane_id,
&collection_data_plane.broker_address,
),
ops_logs_journal,
ops_stats_journal,
))
}
7 changes: 2 additions & 5 deletions crates/agent/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum::{http::StatusCode, response::IntoResponse};
use std::sync::{Arc, Mutex};

mod authorize_role;
mod authorize_dekaf;
mod authorize_task;
mod authorize_user_collection;
mod authorize_user_task;
@@ -104,10 +104,7 @@ pub fn build_router(

let schema_router = axum::Router::new()
.route("/authorize/task", post(authorize_task::authorize_task))
.route(
"/authorize/role/:role_name",
post(authorize_role::authorize_role),
)
.route("/authorize/dekaf", post(authorize_dekaf::authorize_dekaf))
.route(
"/authorize/user/task",
post(authorize_user_task::authorize_user_task)
61 changes: 2 additions & 59 deletions crates/flow-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ pub async fn fetch_task_authorization(
data_plane_signer: &jsonwebtoken::EncodingKey,
capability: u32,
selector: gazette::broker::LabelSelector,
) -> anyhow::Result<(String, String, gazette::journal::Client)> {
) -> anyhow::Result<gazette::journal::Client> {
let request_token = build_task_authorization_request_token(
shard_template_id,
data_plane_fqdn,
@@ -169,8 +169,6 @@ pub async fn fetch_task_authorization(
let models::authorizations::TaskAuthorization {
broker_address,
token,
ops_logs_journal,
ops_stats_journal,
retry_millis: _,
} = loop {
let response: models::authorizations::TaskAuthorization = client
@@ -202,7 +200,7 @@ pub async fn fetch_task_authorization(
.journal_client
.with_endpoint_and_metadata(broker_address, md);

Ok((ops_logs_journal, ops_stats_journal, journal_client))
Ok(journal_client)
}

fn build_task_authorization_request_token(
@@ -230,61 +228,6 @@ fn build_task_authorization_request_token(
Ok(signed_request_token)
}

// Claims returned by `/authorize/role`
#[derive(Debug, Clone, serde::Deserialize)]
pub struct RoleTokenClaims {
pub role: String,
pub iat: u64,
pub exp: u64,
}

#[tracing::instrument(skip(client, data_plane_signer), err)]
pub async fn fetch_control_plane_authorization(
client: Client,
role: models::authorizations::AllowedRole,
shard_template_id: &str,
data_plane_fqdn: &str,
data_plane_signer: &jsonwebtoken::EncodingKey,
capability: u32,
selector: gazette::broker::LabelSelector,
) -> anyhow::Result<(Client, RoleTokenClaims)> {
let request_token = build_task_authorization_request_token(
shard_template_id,
data_plane_fqdn,
data_plane_signer,
capability,
selector,
)?;

let models::authorizations::RoleAuthorization {
token,
retry_millis: _,
} = loop {
let response: models::authorizations::RoleAuthorization = client
.agent_unary(
format!("/authorize/role/{}", role.to_string()).as_str(),
&models::authorizations::TaskAuthorizationRequest {
token: request_token.clone(),
},
)
.await?;

if response.retry_millis != 0 {
tracing::warn!(
secs = response.retry_millis as f64 / 1000.0,
"authorization service tentatively rejected our request, but will retry before failing"
);
() = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await;
continue;
}
break response;
};

let claims = parse_jwt_claims(token.as_str())?;

Ok((client.with_user_access_token(Some(token)), claims))
}

#[tracing::instrument(skip(client), err)]
pub async fn fetch_user_task_authorization(
client: &Client,
3 changes: 1 addition & 2 deletions crates/flow-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -2,8 +2,7 @@ use anyhow::Context;

pub mod client;
pub use client::{
fetch_collection_authorization, fetch_control_plane_authorization, fetch_task_authorization,
fetch_user_task_authorization, Client,
fetch_collection_authorization, fetch_task_authorization, fetch_user_task_authorization, Client,
};

pub mod pagination;
33 changes: 10 additions & 23 deletions crates/models/src/authorizations.rs
Original file line number Diff line number Diff line change
@@ -46,12 +46,6 @@ pub struct TaskAuthorization {
/// # Address of Gazette brokers for the issued token.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub broker_address: String,
// Name of the journal that contains the logs for the specified task
#[serde(default, skip_serializing_if = "String::is_empty")]
pub ops_logs_journal: String,
// Name of the journal that contains the stats for the specified task
#[serde(default, skip_serializing_if = "String::is_empty")]
pub ops_stats_journal: String,
/// # Number of milliseconds to wait before retrying the request.
/// Non-zero if and only if token is not set.
pub retry_millis: u64,
@@ -154,28 +148,21 @@ pub struct UserTaskAuthorization {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub shard_id_prefix: String,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum AllowedRole {
#[serde(rename = "dekaf")]
Dekaf,
}

impl fmt::Display for AllowedRole {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let role_str = match self {
AllowedRole::Dekaf => "dekaf",
};
write!(f, "{}", role_str)
}
}

#[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RoleAuthorization {
pub struct DekafAuthResponse {
/// # Control plane access token with the requested role
#[serde(default, skip_serializing_if = "String::is_empty")]
pub token: String,
// Name of the journal that contains the logs for the specified task
#[serde(default, skip_serializing_if = "String::is_empty")]
pub ops_logs_journal: String,
// Name of the journal that contains the stats for the specified task
#[serde(default, skip_serializing_if = "String::is_empty")]
pub ops_stats_journal: String,
// Spec of the task
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_spec: Option<crate::materializations::MaterializationDef>,
/// # Number of milliseconds to wait before retrying the request.
/// Non-zero if and only if token is not set.
pub retry_millis: u64,

0 comments on commit 1c24d32

Please sign in to comment.