Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent-api changes for Dekaf #1793

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ authors.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
activate = { path = "../activate" }
agent-sql = { path = "../agent-sql" }
allocator = { path = "../allocator" }
async-process = { path = "../async-process" }
dekaf = { path = "../dekaf" }
build = { path = "../build" }
dekaf = { path = "../dekaf" }
doc = { path = "../doc" }
gazette = { path = "../gazette" }
json = { path = "../json" }
Expand Down Expand Up @@ -72,6 +71,6 @@ xxhash-rust = { workspace = true }
[dev-dependencies]
insta = { workspace = true }
md5 = { workspace = true }
tokio = { workspace = true }
serial_test = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
195 changes: 195 additions & 0 deletions crates/agent/src/api/authorize_dekaf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use super::App;
use crate::api::snapshot::Snapshot;
use anyhow::Context;
use models::CatalogType;
use std::sync::Arc;

type Request = models::authorizations::TaskAuthorizationRequest;
type Response = models::authorizations::DekafAuthResponse;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct AccessTokenClaims {
exp: u64,
iat: u64,
role: String,
}

#[axum::debug_handler]
pub async fn authorize_dekaf(
axum::extract::State(app): axum::extract::State<Arc<App>>,
axum::Json(request): axum::Json<Request>,
) -> axum::response::Response {
super::wrap(async move { do_authorize_dekaf(&app, &request).await }).await
}

const DEKAF_ROLE: &str = "dekaf";

/// Dekaf straddles the control-plane and data-plane:
/// * It needs full control over the `registered_avro_schemas` table in the control-plane
/// in order to serve its schema registry functionality
/// * It needs access to the full specs of materializations both in order to authenticate
/// sessions, as well as figure out which bindings to expose under what names, etc.
/// * It needs to read from journals in order to serve topic data to consumers
/// * It needs to write to ops logs and stats for observability and billing concerns
///
/// This endpoint provides it a way to do all of these things, while also staying within
/// the authorization framework used by other data-plane actors.
///
/// Specifically, this checks that:
/// * Your request is coming from an authorized actor in a data-plane,
/// * That actor is acting on behalf of a task running in that same data-plane.
///
/// Once we've authenticated and authorized the request as best we can, we put together
/// a package of all the information Dekaf needs in one place:
/// * A short-lived control-plane access token to authorize requests under the `dekaf` role
/// which has grants to the `public.registered_avro_schemas` table
/// * The `models::MaterializationDef` for the materialization being requested
/// as identified by the `sub` JWT claim
/// * The ops logs and stats journal names for the materialization. This will allow Dekaf to
/// write ops logs and stats.
#[tracing::instrument(skip(app), err(level = tracing::Level::WARN))]
async fn do_authorize_dekaf(app: &App, Request { token }: &Request) -> anyhow::Result<Response> {
let jsonwebtoken::TokenData { header, claims }: jsonwebtoken::TokenData<proto_gazette::Claims> =
{
// In this pass we do not validate the signature,
// because we don't yet know which data-plane the JWT is signed by.
let empty_key = jsonwebtoken::DecodingKey::from_secret(&[]);
let mut validation = jsonwebtoken::Validation::default();
validation.insecure_disable_signature_validation();
jsonwebtoken::decode(token, &empty_key, &validation)
}?;
tracing::debug!(?claims, ?header, "decoded authorization request");

let task_name = claims.sub.as_str();
if task_name.is_empty() {
anyhow::bail!("missing required materialization name (`sub` claim)");
}

let shard_data_plane_fqdn = claims.iss.as_str();
if shard_data_plane_fqdn.is_empty() {
anyhow::bail!("missing required task data-plane FQDN (`iss` claim)");
}

if claims.cap != proto_flow::capability::AUTHORIZE {
anyhow::bail!("invalid capability, must be AUTHORIZE only: {}", claims.cap);
}

match Snapshot::evaluate(&app.snapshot, claims.iat, |snapshot: &Snapshot| {
evaluate_authorization(snapshot, task_name, shard_data_plane_fqdn, token)
}) {
Ok((ops_logs_journal, ops_stats_journal)) => {
let materialization_spec = sqlx::query!(
r#"
select
spec as "spec: sqlx::types::Json<models::MaterializationDef>",
spec_type as "spec_type: models::CatalogType"
from live_specs
where live_specs.catalog_name = $1
"#,
task_name
)
.fetch_one(&app.pg_pool)
.await
.context("failed to fetch task spec")?;

let (Some(materialization_spec), Some(spec_type)) =
(materialization_spec.spec, materialization_spec.spec_type)
else {
anyhow::bail!("`live_specs` row for {task_name} is missing spec or spec_type");
};

if !matches!(spec_type, models::CatalogType::Materialization) {
anyhow::bail!("Unexpected spec type {:?}", spec_type);
}

let unix_ts = jsonwebtoken::get_current_timestamp();
let claims = AccessTokenClaims {
iat: unix_ts,
exp: unix_ts + (60 * 60),
role: DEKAF_ROLE.to_string(),
};

let signed = jsonwebtoken::encode(
&jsonwebtoken::Header::default(),
&claims,
&app.control_plane_jwt_signer,
)?;

Ok(Response {
token: signed,
ops_logs_journal,
ops_stats_journal,
task_spec: Some(materialization_spec.0),
retry_millis: 0,
})
}
Err(Ok(retry_millis)) => Ok(Response {
retry_millis,
..Default::default()
}),
Err(Err(err)) => Err(err),
}
}

fn evaluate_authorization(
snapshot: &Snapshot,
task_name: &str,
shard_data_plane_fqdn: &str,
token: &str,
) -> anyhow::Result<(String, String)> {
tracing::debug!(?task_name, "Task name");
// Map `claims.sub`, a task name, into its task.
let task = snapshot.task_by_catalog_name(&models::Name::new(task_name.to_string()));

// Map `claims.iss`, a data-plane FQDN, into its task-matched data-plane.
let task_data_plane = task.and_then(|task| {
snapshot
.data_planes
.get_by_key(&task.data_plane_id)
.filter(|data_plane| data_plane.data_plane_fqdn == shard_data_plane_fqdn)
});

let (Some(task), Some(task_data_plane)) = (task, task_data_plane) else {
anyhow::bail!("task {task_name} within data-plane {shard_data_plane_fqdn} is not known")
};

if task.spec_type != CatalogType::Materialization {
anyhow::bail!(
"task {task_name} must be a materialization, but is {:?} instead",
task.spec_type
)
}

// Attempt to find an HMAC key of this data-plane which validates against the request token.
let validation = jsonwebtoken::Validation::default();
let mut verified = false;

for hmac_key in &task_data_plane.hmac_keys {
let key = jsonwebtoken::DecodingKey::from_base64_secret(hmac_key)
.context("invalid data-plane hmac key")?;

if jsonwebtoken::decode::<proto_gazette::Claims>(token, &key, &validation).is_ok() {
verified = true;
break;
}
}
if !verified {
anyhow::bail!("no data-plane keys validated against the token signature");
}

let (Some(ops_logs), Some(ops_stats)) = (
snapshot.collection_by_catalog_name(&task_data_plane.ops_logs_name),
snapshot.collection_by_catalog_name(&task_data_plane.ops_stats_name),
) else {
anyhow::bail!(
"couldn't resolve data-plane {} ops collections",
task.data_plane_id
)
};

let ops_suffix = super::ops_suffix(task);
let ops_logs_journal = format!("{}{}", ops_logs.journal_template_name, &ops_suffix[1..]);
let ops_stats_journal = format!("{}{}", ops_stats.journal_template_name, &ops_suffix[1..]);

Ok((ops_logs_journal, ops_stats_journal))
}
50 changes: 38 additions & 12 deletions crates/agent/src/api/authorize_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ pub async fn authorize_task(
super::wrap(async move { do_authorize_task(&app, &request).await }).await
}

/// Authorizes some set of actions to be performed on a particular collection by way of a task.
jshearer marked this conversation as resolved.
Show resolved Hide resolved
/// This checks that:
/// * The request is `iss`ued by an actor in a particular data plane
/// * Validated by checking the request signature against the HMACs for the `iss`uer data-plane
/// * The request is on behalf of a `sub`ject task running in that data plane
/// * The subject task is identified by its `shard_template_id`, not just its name.
/// * The request is to perform some `cap`abilities on a particular collection
/// * The collection is identified by its `journal_template_name`, not just its name.
/// * The target collection is specified as a label selector for the label `name`
/// * The request's subject is granted those capabilities on that collection by
/// the control-plane
/// * The requested collection may be in a different data plane than the issuer.
#[tracing::instrument(skip(app), err(level = tracing::Level::WARN))]
async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Result<Response> {
let jsonwebtoken::TokenData { header, mut claims }: jsonwebtoken::TokenData<
Expand Down Expand Up @@ -47,16 +59,28 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re
claims.cap &= !proto_flow::capability::AUTHORIZE;

// Validate and match the requested capabilities to a corresponding role.
// NOTE: Because we pass through the claims after validating them here,
// we need to explicitly enumerate and exactly match every case, as just
// checking that the requested capability contains a particular grant isn't enough.
// For example, we wouldn't want to allow a request for `REPLICATE` just
// because it also requests `READ`.
let required_role = match claims.cap {
proto_gazette::capability::LIST | proto_gazette::capability::READ => {
cap if (cap == proto_gazette::capability::LIST)
|| (cap == proto_gazette::capability::READ)
|| (cap == (proto_gazette::capability::LIST | proto_gazette::capability::READ)) =>
{
models::Capability::Read
}
proto_gazette::capability::APPLY | proto_gazette::capability::APPEND => {
// We're intentionally rejecting requests for both APPLY and APPEND, as those two
// grants authorize wildly different capabilities, and no sane logic should
// need both at the same time. So as a sanity check/defense-in-depth measure
// we won't grant you a token that has both, even if we technically could.
cap if (cap == proto_gazette::capability::APPLY)
|| (cap == proto_gazette::capability::APPEND) =>
{
models::Capability::Write
}
cap => {
anyhow::bail!("capability {cap} cannot be authorized by this service");
}
cap => anyhow::bail!("capability {cap} cannot be authorized by this service"),
};

match Snapshot::evaluate(&app.snapshot, claims.iat, |snapshot: &Snapshot| {
Expand Down Expand Up @@ -91,19 +115,21 @@ async fn do_authorize_task(app: &App, Request { token }: &Request) -> anyhow::Re
}

fn evaluate_authorization(
Snapshot {
collections,
data_planes,
role_grants,
tasks,
..
}: &Snapshot,
snapshot: &Snapshot,
shard_id: &str,
shard_data_plane_fqdn: &str,
token: &str,
journal_name_or_prefix: &str,
required_role: models::Capability,
) -> anyhow::Result<(jsonwebtoken::EncodingKey, String, String)> {
let Snapshot {
collections,
data_planes,
role_grants,
tasks,
..
} = snapshot;

// Map `claims.sub`, a Shard ID, into its task.
let task = tasks
.binary_search_by(|task| {
Expand Down
12 changes: 7 additions & 5 deletions crates/agent/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use axum::{http::StatusCode, response::IntoResponse};
use std::sync::{Arc, Mutex};

mod authorize_dekaf;
mod authorize_task;
mod authorize_user_collection;
mod authorize_user_task;
Expand Down Expand Up @@ -42,7 +43,8 @@ pub enum Rejection {

struct App {
id_generator: Mutex<models::IdGenerator>,
jwt_secret: jsonwebtoken::DecodingKey,
control_plane_jwt_verifier: jsonwebtoken::DecodingKey,
control_plane_jwt_signer: jsonwebtoken::EncodingKey,
jwt_validation: jsonwebtoken::Validation,
pg_pool: sqlx::PgPool,
publisher: crate::publications::Publisher,
Expand All @@ -57,16 +59,15 @@ pub fn build_router(
publisher: crate::publications::Publisher,
allow_origin: &[String],
) -> anyhow::Result<axum::Router<()>> {
let jwt_secret = jsonwebtoken::DecodingKey::from_secret(&jwt_secret);

let mut jwt_validation = jsonwebtoken::Validation::default();
jwt_validation.set_audience(&["authenticated"]);

let (snapshot, seed_rx) = snapshot::seed();

let app = Arc::new(App {
id_generator: Mutex::new(id_generator),
jwt_secret,
control_plane_jwt_verifier: jsonwebtoken::DecodingKey::from_secret(&jwt_secret),
control_plane_jwt_signer: jsonwebtoken::EncodingKey::from_secret(&jwt_secret),
jwt_validation,
pg_pool,
publisher,
Expand Down Expand Up @@ -103,6 +104,7 @@ pub fn build_router(

let schema_router = axum::Router::new()
.route("/authorize/task", post(authorize_task::authorize_task))
.route("/authorize/dekaf", post(authorize_dekaf::authorize_dekaf))
.route(
"/authorize/user/task",
post(authorize_user_task::authorize_user_task)
Expand Down Expand Up @@ -192,7 +194,7 @@ async fn authorize(
) -> axum::response::Response {
let token = match jsonwebtoken::decode::<ControlClaims>(
bearer.token(),
&app.jwt_secret,
&app.control_plane_jwt_verifier,
&app.jwt_validation,
) {
Ok(claims) => claims,
Expand Down
1 change: 1 addition & 0 deletions crates/agent/src/api/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct SnapshotCollection {
}
// SnapshotTask is the state of a live task which influences authorization.
// It's indexed on `shard_template_id`.
#[derive(Debug)]
pub struct SnapshotTask {
// Template shard ID which prefixes all shard IDs of the task.
pub shard_template_id: String,
Expand Down
3 changes: 3 additions & 0 deletions crates/flow-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ license.workspace = true
gazette = { path = "../gazette" }
models = { path = "../models" }
ops = { path = "../ops" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }

anyhow = { workspace = true }
base64 = { workspace = true }
futures = { workspace = true }
jsonwebtoken = { workspace = true }
lazy_static = { workspace = true }
page-turner = { workspace = true }
postgrest = { workspace = true }
Expand Down
Loading
Loading