Skip to content

Commit

Permalink
Support for OIDC, dex installation, and authorization configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 21, 2024
1 parent b42cb6a commit 2bda52a
Show file tree
Hide file tree
Showing 52 changed files with 1,814 additions and 365 deletions.
4 changes: 4 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-oidc</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import org.eclipse.microprofile.openapi.annotations.tags.Tag;

import com.github.streamshub.console.api.model.ConfigEntry;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.BrokerService;
import com.github.streamshub.console.config.security.Privilege;

@Path("/api/kafkas/{clusterId}/nodes")
@Tag(name = "Kafka Cluster Resources")
Expand All @@ -32,6 +35,8 @@ public class BrokersResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(action = Privilege.GET)
public CompletionStage<Response> describeConfigs(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand All @@ -41,7 +46,7 @@ public CompletionStage<Response> describeConfigs(
@Parameter(description = "Node identifier")
String nodeId) {

return brokerService.describeConfigs(nodeId)
return brokerService.describeConfigs(clusterId, nodeId)
.thenApply(ConfigEntry.ConfigResponse::new)
.thenApply(Response::ok)
.thenApply(Response.ResponseBuilder::build);
Expand Down
122 changes: 44 additions & 78 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -51,8 +50,6 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
Expand All @@ -61,6 +58,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.github.streamshub.console.api.security.SaslJaasConfigCredential;
import com.github.streamshub.console.api.service.KafkaClusterService;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
Expand All @@ -72,6 +70,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.quarkus.security.identity.SecurityIdentity;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
Expand Down Expand Up @@ -101,20 +100,11 @@ public class ClientFactory {
public static final String SCRAM_SHA256 = "SCRAM-SHA-256";
public static final String SCRAM_SHA512 = "SCRAM-SHA-512";

private static final String BEARER = "Bearer ";
private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ " required"
+ " oauth.access.token=\"%s\" ;";

private static final String BASIC = "Basic ";
private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;";
private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName());
private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName());

static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
public static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
private final Function<String, NotFoundException> noSuchKafka =
clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName));
clusterId -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterId));

@Inject
Logger log;
Expand Down Expand Up @@ -278,7 +268,7 @@ public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) {
findConfig(kafka).ifPresentOrElse(
clusterConfig -> {
String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, Optional.of(kafka));
String clusterId = KafkaContext.clusterId(clusterConfig, Optional.of(kafka));
log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
log.debugf("Known KafkaContext identifiers: %s", contexts.keySet());
KafkaContext previous = contexts.remove(clusterId);
Expand Down Expand Up @@ -332,7 +322,7 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs));

String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, kafkaResource);
String clusterId = KafkaContext.clusterId(clusterConfig, kafkaResource);

if (contexts.containsKey(clusterId) && !allowReplacement) {
log.warnf("""
Expand Down Expand Up @@ -362,12 +352,6 @@ boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kaf
return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty();
}

String clusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kafkaResource) {
return Optional.ofNullable(clusterConfig.getId())
.or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
.orElseGet(clusterConfig::getName);
}

Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore)
.map(store -> store.getByKey(clusterConfig.clusterKey()))
Expand Down Expand Up @@ -511,6 +495,7 @@ void disposeKafkaContexts(@Disposes Map<String, KafkaContext> contexts) {
@Produces
@RequestScoped
public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
SecurityIdentity identity,
UnaryOperator<Admin> filter,
Function<Map<String, Object>, Admin> adminBuilder) {

Expand All @@ -520,22 +505,28 @@ public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
return KafkaContext.EMPTY;
}

return Optional.ofNullable(contexts.get(clusterId))
.map(ctx -> {
if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*/
var adminConfigs = maybeAuthenticate(ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}
KafkaContext ctx = contexts.get(clusterId);

return ctx;
})
.orElseThrow(() -> noSuchKafka.apply(clusterId));
if (ctx == null) {
throw noSuchKafka.apply(clusterId);
}

if (identity.isAnonymous()) {
return ctx;
}

if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*/
var adminConfigs = maybeAuthenticate(identity, ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}

return ctx;
}

public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, KafkaContext> contexts) {
Expand All @@ -552,8 +543,8 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context, SecurityIdentity identity) {
var configs = maybeAuthenticate(identity, context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;
}
Expand All @@ -564,8 +555,8 @@ public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consum

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context, SecurityIdentity identity) {
var configs = maybeAuthenticate(identity, context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
}
Expand All @@ -574,13 +565,13 @@ public void producerDisposer(@Disposes Supplier<Producer<String, String>> produc
producer.get().close();
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Map<String, Object> maybeAuthenticate(SecurityIdentity identity, KafkaContext context, Class<?> clientType) {
Map<String, Object> configs = context.configs(clientType);

if (configs.containsKey(SaslConfigs.SASL_MECHANISM)
&& !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
configs = new HashMap<>(configs);
configureAuthentication(context.saslMechanism(clientType), configs);
configureAuthentication(identity, context.saslMechanism(clientType), configs);
}

return configs;
Expand Down Expand Up @@ -756,63 +747,38 @@ void logConfig(String clientType, Map<String, Object> config) {
}
}

void configureAuthentication(String saslMechanism, Map<String, Object> configs) {
void configureAuthentication(SecurityIdentity identity, String saslMechanism, Map<String, Object> configs) {
SaslJaasConfigCredential credential = identity.getCredential(SaslJaasConfigCredential.class);

switch (saslMechanism) {
case OAUTHBEARER:
configureOAuthBearer(configs);
configureOAuthBearer(credential, configs);
break;
case PLAIN:
configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE);
configureBasic(credential, configs);
break;
case SCRAM_SHA256, SCRAM_SHA512:
configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE);
configureBasic(credential, configs);
break;
default:
throw new NotAuthorizedException("Unknown");
}
}

void configureOAuthBearer(Map<String, Object> configs) {
void configureOAuthBearer(SaslJaasConfigCredential credential, Map<String, Object> configs) {
log.trace("SASL/OAUTHBEARER enabled");

configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK);
// Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin)
// May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS.
configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0");

String jaasConfig = getAuthorization(BEARER)
.map(SASL_OAUTH_CONFIG_TEMPLATE::formatted)
.orElseThrow(() -> new NotAuthorizedException(BEARER.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value());
}

void configureBasic(Map<String, Object> configs, String template) {
void configureBasic(SaslJaasConfigCredential credential, Map<String, Object> configs) {
log.trace("SASL/SCRAM enabled");

String jaasConfig = getBasicAuthentication()
.map(template::formatted)
.orElseThrow(() -> new NotAuthorizedException(BASIC.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

Optional<String[]> getBasicAuthentication() {
return getAuthorization(BASIC)
.map(Base64.getDecoder()::decode)
.map(String::new)
.filter(authn -> authn.indexOf(':') >= 0)
.map(authn -> new String[] {
authn.substring(0, authn.indexOf(':')),
authn.substring(authn.indexOf(':') + 1)
})
.filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty());
}

Optional<String> getAuthorization(String scheme) {
return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION))
.filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length()))
.map(header -> header.substring(scheme.length()));
configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value());
}

private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
import com.github.streamshub.console.api.model.ConsumerGroup;
import com.github.streamshub.console.api.model.ConsumerGroupFilterParams;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.ConsumerGroupService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;
import com.github.streamshub.console.config.security.Privilege;

import io.xlate.validation.constraints.Expression;

Expand All @@ -67,6 +70,8 @@ public class ConsumerGroupsResource {
@APIResponseSchema(ConsumerGroup.ListResponse.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(action = Privilege.LIST)
public CompletionStage<Response> listConsumerGroups(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -132,6 +137,8 @@ public CompletionStage<Response> listConsumerGroups(
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(action = Privilege.GET)
public CompletionStage<Response> describeConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -200,6 +207,8 @@ public CompletionStage<Response> describeConsumerGroup(
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
@Authorized
@ResourcePrivilege(action = Privilege.UPDATE)
public CompletionStage<Response> patchConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -244,6 +253,8 @@ public CompletionStage<Response> patchConsumerGroup(
@Path("{groupId}")
@DELETE
@APIResponseSchema(responseCode = "204", value = Void.class)
@Authorized
@ResourcePrivilege(action = Privilege.DELETE)
public CompletionStage<Response> deleteConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@

import com.github.streamshub.console.api.model.KafkaCluster;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.KafkaClusterService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;
import com.github.streamshub.console.config.security.Privilege;

import io.xlate.validation.constraints.Expression;

Expand All @@ -63,6 +66,8 @@ public class KafkaClustersResource {
@APIResponseSchema(KafkaCluster.KafkaClusterDataList.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(action = Privilege.LIST)
public Response listClusters(
@QueryParam(KafkaCluster.FIELDS_PARAM)
@DefaultValue(KafkaCluster.Fields.LIST_DEFAULT)
Expand Down Expand Up @@ -121,6 +126,8 @@ public Response listClusters(
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(action = Privilege.GET)
public CompletionStage<Response> describeCluster(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -172,7 +179,7 @@ public CompletionStage<Response> describeCluster(

requestedFields.accept(fields);

return clusterService.describeCluster(fields)
return clusterService.describeCluster(clusterId, fields)
.thenApply(KafkaCluster.KafkaClusterData::new)
.thenApply(Response::ok)
.thenApply(Response.ResponseBuilder::build);
Expand Down
Loading

0 comments on commit 2bda52a

Please sign in to comment.