Skip to content

Commit

Permalink
dekaf: Add further support for authorizing requests using `/authorize…
Browse files Browse the repository at this point in the history
…/role` and `/authorize/task`
  • Loading branch information
jshearer committed Nov 22, 2024
1 parent e307198 commit 1fe3c5c
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 151 deletions.
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 16 additions & 24 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@ pub use api_client::KafkaApiClient;
use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use connector::{DekafConfig, DeletionMode};
use flow_client::client::{
fetch_control_plane_authorization, fetch_task_authorization, refresh_authorizations,
RefreshToken,
fetch_control_plane_authorization, refresh_authorizations, RefreshToken,
};
use models::{authorizations::ControlClaims, Materialization};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use proto_flow::flow::MaterializationSpec;
use serde::{Deserialize, Serialize};
use std::time::{Instant, SystemTime};
use std::time::SystemTime;

pub struct App {
/// Hostname which is advertised for Kafka access.
Expand Down Expand Up @@ -70,12 +67,12 @@ pub struct UserAuth {

pub struct TaskAuth {
client: flow_client::Client,
name: String,
task_name: String,
config: DekafConfig,
bindings: Vec<models::MaterializationBinding>,

// Needed to refresh auth token
claims: ControlClaims,
// When access token expires
exp: time::OffsetDateTime,
}

pub enum SessionAuthentication {
Expand Down Expand Up @@ -151,23 +148,21 @@ impl TaskAuth {
&mut self,
app: &App,
) -> anyhow::Result<&flow_client::Client> {
if self.claims.time_remaining().whole_seconds() < 60 {
if (self.exp - time::OffsetDateTime::now_utc()).whole_seconds() < 60 {
let (client, claims) = fetch_control_plane_authorization(
self.client.clone(),
models::authorizations::AllowedRole::Dekaf,
&dekaf_shard_template_id(&self.name),
&dekaf_shard_template_id(&self.task_name),
&app.data_plane_fqdn,
&app.data_plane_signer,
proto_flow::capability::AUTHORIZE & proto_gazette::capability::READ,
gazette::broker::LabelSelector {
include: Some(labels::build_set([(labels::TASK_NAME, self.name.as_str())])),
exclude: None,
},
proto_flow::capability::AUTHORIZE,
Default::default(),
)
.await?;

self.client = client.with_fresh_gazette_client();
self.claims = claims;
self.exp =
time::OffsetDateTime::UNIX_EPOCH + time::Duration::seconds(claims.exp as i64);
}

Ok(&self.client)
Expand Down Expand Up @@ -197,11 +192,8 @@ impl App {
&dekaf_shard_template_id(&username),
&self.data_plane_fqdn,
&self.data_plane_signer,
proto_flow::capability::AUTHORIZE & proto_gazette::capability::READ,
gazette::broker::LabelSelector {
include: Some(labels::build_set([(labels::TASK_NAME, username.as_str())])),
exclude: None,
},
proto_flow::capability::AUTHORIZE,
Default::default(),
)
.await?;

Expand All @@ -220,11 +212,11 @@ impl App {
}

Ok(SessionAuthentication::Task(TaskAuth {
name: username,
task_name: username,
config,
bindings: task_spec.bindings.clone(),
client,
claims,
exp: time::OffsetDateTime::UNIX_EPOCH + time::Duration::seconds(claims.exp as i64),
}))
} else if username.contains("{") {
let raw_token = String::from_utf8(base64::decode(password)?.to_vec())?;
Expand Down Expand Up @@ -596,7 +588,7 @@ fn decode_safe_name(safe_name: String) -> anyhow::Result<String> {
/// used throughout the data-plane. Dekaf materializations, on the other hand, have predictable
/// shard template IDs since they never publish shards whose names could conflict.
fn dekaf_shard_template_id(task_name: &str) -> String {
format!("materialize/{task_name}/0000000000000000")
format!("materialize/{task_name}/0000000000000000/")
}

/// Modified from [this](https://github.com/serde-rs/serde/issues/368#issuecomment-1579475447)
Expand Down
46 changes: 33 additions & 13 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use axum_server::tls_rustls::RustlsConfig;
use clap::{Args, Parser};
use dekaf::{KafkaApiClient, Session};
use flow_client::{
DEFAULT_AGENT_URL, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, LOCAL_PG_PUBLIC_TOKEN, LOCAL_PG_URL,
DEFAULT_AGENT_URL, DEFAULT_DATA_PLANE_FQDN, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL,
LOCAL_AGENT_URL, LOCAL_DATA_PLANE_FQDN, LOCAL_DATA_PLANE_HMAC, LOCAL_PG_PUBLIC_TOKEN,
LOCAL_PG_URL,
};
use futures::{FutureExt, TryStreamExt};
use rsasl::config::SASLConfig;
Expand All @@ -29,20 +31,30 @@ pub struct Cli {
#[arg(
long,
default_value = DEFAULT_PG_URL.as_str(),
default_value_if("local", "true", Some(LOCAL_PG_URL.as_str())),
env = "API_ENDPOINT"
)]
api_endpoint: Url,
/// Public (anon) API key to use during authentication to the Estuary API.
#[arg(
long,
default_value = DEFAULT_PG_PUBLIC_TOKEN,
default_value_if("local", "true", Some(LOCAL_PG_PUBLIC_TOKEN)),
env = "API_KEY"
)]
api_key: String,
/// Endpoint of the Estuary agent API to use.
#[arg(
long,
default_value = DEFAULT_AGENT_URL.as_str(),
default_value_if("local", "true", Some(LOCAL_AGENT_URL.as_str())),
env = "AGENT_ENDPOINT"
)]
agent_endpoint: Url,

/// When true, override the configured API endpoint and token,
/// in preference of a local control plane.
#[arg(long)]
#[arg(long, action(clap::ArgAction::SetTrue))]
local: bool,
/// The hostname to advertise when enumerating Kafka "brokers".
/// This is the hostname at which `dekaf` may be accessed.
Expand Down Expand Up @@ -87,11 +99,25 @@ pub struct Cli {
idle_session_timeout: std::time::Duration,

/// The fully-qualified domain name of the data plane that Dekaf is running inside of
#[arg(long, env = "DATA_PLANE_FQDN")]
#[arg(
long,
env = "DATA_PLANE_FQDN",
default_value=DEFAULT_DATA_PLANE_FQDN,
default_value_if("local", "true", Some(LOCAL_DATA_PLANE_FQDN)),
)]
data_plane_fqdn: String,
/// An HMAC key recognized by the data plane that Dekaf is running inside of. Used to
/// sign data-plane access token requests.
#[arg(long, env = "DATA_PLANE_ACCESS_KEY")]
#[arg(
long,
env = "DATA_PLANE_ACCESS_KEY",
default_value_if("local", "true", Some(LOCAL_DATA_PLANE_HMAC)),
// This is a work-around to clap_derive's somewhat buggy handling of `default_value_if`.
// The end result is that `data_plane_access_key` is required iff `--local` is not specified.
// If --local is specified, it will use the value in LOCAL_DATA_PLANE_HMAC unless overridden.
// See https://github.com/clap-rs/clap/issues/4086 and https://github.com/clap-rs/clap/issues/4918
required(false)
)]
data_plane_access_key: String,

#[command(flatten)]
Expand Down Expand Up @@ -125,12 +151,6 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("Starting dekaf");

let (api_endpoint, api_key) = if cli.local {
(LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string())
} else {
(cli.api_endpoint, cli.api_key)
};

let upstream_kafka_host = format!(
"tcp://{}:{}",
cli.default_broker_hostname, cli.default_broker_port
Expand All @@ -154,9 +174,9 @@ async fn main() -> anyhow::Result<()> {
data_plane_signer: jsonwebtoken::EncodingKey::from_base64_secret(&cli.data_plane_access_key)?,
data_plane_fqdn: cli.data_plane_fqdn,
client_base: flow_client::Client::new(
DEFAULT_AGENT_URL.to_owned(),
api_key,
api_endpoint,
cli.agent_endpoint,
cli.api_key,
cli.api_endpoint,
None,
)
});
Expand Down
4 changes: 1 addition & 3 deletions crates/dekaf/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use super::App;
use crate::{
from_downstream_topic_name, to_downstream_topic_name, topology, SessionAuthentication,
};
use crate::{from_downstream_topic_name, to_downstream_topic_name, SessionAuthentication};
use anyhow::Context;
use axum::response::{IntoResponse, Response};
use axum_extra::headers;
Expand Down
Loading

0 comments on commit 1fe3c5c

Please sign in to comment.