Skip to content

Commit

Permalink
Authorization, audit logging, optional OIDC authN, UI error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Dec 13, 2024
1 parent 6a39505 commit da9cc56
Show file tree
Hide file tree
Showing 164 changed files with 6,178 additions and 1,875 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,19 @@ Prometheus is an optional dependency of the console if cluster metrics are to be
- User-supplied Prometheus instances
- Private Prometheus instance for each `Console`. The operator creates a managed Prometheus deployment for use only by the console.

#### OIDC Provider
The console may be configured to use an OpenID Connect (OIDC) provider for user authentication. An example using [dex](https://dexidp.io/) for OIDC with an OpenShift identity provider is available in [examples/dex-openshift](./examples/dex-openshift).

### Deploy the operator with OLM
The preferred way to deploy the console is using the Operator Lifecycle Manager, or OLM. The sample install files in `install/operator-olm` will install the operator with cluster-wide scope. This means that `Console` instances may be created in any namespace. If you wish to limit the scope of the operator, the `OperatorGroup` resource may be modified to specify only the namespace that should be watched by the operator.

This example will create the operator's OLM resources in the `default` namespace. Modify the `NAMESPACE` variable according to your needs.

```shell
export NAMESPACE=default
cat install/operator-olm/*.yaml | envsubst | kubectl apply -n ${NAMESPACE} -f -
```

#### Console Custom Resource Example
Once the operator is ready, you may then create a `Console` resource in the namespace where the console should be deployed. This example `Console` is based on the example Apache Kafka<sup>®</sup> cluster deployed above in the [prerequisites section](#prerequisites). Also see [examples/console/010-Console-example.yaml](examples/console/010-Console-example.yaml).
```yaml
Expand Down
18 changes: 14 additions & 4 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-oidc</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand Down Expand Up @@ -335,13 +339,19 @@
</configuration>
</execution>
<execution>
<id>default-report-integration</id>
<id>report-aggregate</id>
<goals>
<goal>report-integration</goal>
<goal>report-aggregate</goal>
</goals>
<phase>verify</phase>
<configuration>
<dataFile>${project.build.directory}/jacoco-quarkus.exec</dataFile>
<formats>XML</formats>
<includeCurrentProject>true</includeCurrentProject>
<excludes>
<exclude>com/github/streamshub/console/config/**/*Builder.class</exclude>
<exclude>com/github/streamshub/console/config/**/*Fluent.class</exclude>
<exclude>com/github/streamshub/console/config/**/*Nested.class</exclude>
</excludes>
<outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import org.eclipse.microprofile.openapi.annotations.tags.Tag;

import com.github.streamshub.console.api.model.ConfigEntry;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.BrokerService;
import com.github.streamshub.console.config.security.Privilege;

@Path("/api/kafkas/{clusterId}/nodes")
@Tag(name = "Kafka Cluster Resources")
Expand All @@ -32,6 +35,8 @@ public class BrokersResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.GET)
public CompletionStage<Response> describeConfigs(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down
117 changes: 45 additions & 72 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.streamshub.console.api;

import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -46,14 +45,13 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.security.SaslJaasConfigCredential;
import com.github.streamshub.console.api.service.MetricsService;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
Expand All @@ -67,6 +65,7 @@
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.quarkus.security.identity.SecurityIdentity;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
Expand Down Expand Up @@ -96,20 +95,11 @@ public class ClientFactory {
public static final String SCRAM_SHA256 = "SCRAM-SHA-256";
public static final String SCRAM_SHA512 = "SCRAM-SHA-512";

private static final String BEARER = "Bearer ";
private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ " required"
+ " oauth.access.token=\"%s\" ;";

private static final String BASIC = "Basic ";
private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;";
private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName());
private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName());

static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
private final Function<String, NotFoundException> noSuchKafka =
clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName));
public static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
public static final Function<String, NotFoundException> NO_SUCH_KAFKA =
clusterId -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterId));

@Inject
Logger log;
Expand Down Expand Up @@ -439,7 +429,7 @@ void disposeKafkaContexts(@Disposes Map<String, KafkaContext> contexts) {
log.infof("Closing all known KafkaContexts");

contexts.values().parallelStream().forEach(context -> {
log.infof("Closing KafkaContext %s", Cache.metaNamespaceKeyFunc(context.resource()));
log.infof("Closing KafkaContext %s", context.clusterId());
try {
context.close();
} catch (Exception e) {
Expand All @@ -464,6 +454,7 @@ void disposeKafkaContexts(@Disposes Map<String, KafkaContext> contexts) {
@Produces
@RequestScoped
public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
SecurityIdentity identity,
UnaryOperator<Admin> filter,
Function<Map<String, Object>, Admin> adminBuilder) {

Expand All @@ -473,22 +464,29 @@ public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
return KafkaContext.EMPTY;
}

return Optional.ofNullable(contexts.get(clusterId))
.map(ctx -> {
if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*/
var adminConfigs = maybeAuthenticate(ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}
KafkaContext ctx = contexts.get(clusterId);

return ctx;
})
.orElseThrow(() -> noSuchKafka.apply(clusterId));
if (ctx == null) {
throw NO_SUCH_KAFKA.apply(clusterId);
}

if (ctx.admin() == null) {
/*
* Admin may be null if credentials were not given in the
* configuration. The user must provide the login secrets
* in the request in that case.
*
* The identity should already carry the SASL credentials
* at this point (set in ConsoleAuthenticationMechanism),
* so here we will only retrieve them (if applicable) and
* set them in the admin configuration map.
*/
var adminConfigs = maybeAuthenticate(identity, ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}

return ctx;
}

public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, KafkaContext> contexts) {
Expand All @@ -505,8 +503,8 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Consumer<RecordData, RecordData> consumerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
public Consumer<RecordData, RecordData> consumerSupplier(SecurityIdentity identity, KafkaContext context) {
var configs = maybeAuthenticate(identity, context, Consumer.class);

return new KafkaConsumer<>(
configs,
Expand All @@ -520,8 +518,8 @@ public void disposeConsumer(@Disposes Consumer<RecordData, RecordData> consumer)

@Produces
@RequestScoped
public Producer<RecordData, RecordData> producerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
public Producer<RecordData, RecordData> producerSupplier(SecurityIdentity identity, KafkaContext context) {
var configs = maybeAuthenticate(identity, context, Producer.class);
return new KafkaProducer<>(
configs,
context.schemaRegistryContext().keySerializer(),
Expand All @@ -532,13 +530,13 @@ public void disposeProducer(@Disposes Producer<RecordData, RecordData> producer)
producer.close();
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Map<String, Object> maybeAuthenticate(SecurityIdentity identity, KafkaContext context, Class<?> clientType) {
Map<String, Object> configs = context.configs(clientType);

if (configs.containsKey(SaslConfigs.SASL_MECHANISM)
&& !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
configs = new HashMap<>(configs);
configureAuthentication(context.saslMechanism(clientType), configs);
configureAuthentication(identity, context.saslMechanism(clientType), configs);
}

return configs;
Expand Down Expand Up @@ -697,63 +695,38 @@ void logConfig(String clientType, Map<String, Object> config) {
}
}

void configureAuthentication(String saslMechanism, Map<String, Object> configs) {
void configureAuthentication(SecurityIdentity identity, String saslMechanism, Map<String, Object> configs) {
SaslJaasConfigCredential credential = identity.getCredential(SaslJaasConfigCredential.class);

switch (saslMechanism) {
case OAUTHBEARER:
configureOAuthBearer(configs);
configureOAuthBearer(credential, configs);
break;
case PLAIN:
configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE);
configureBasic(credential, configs);
break;
case SCRAM_SHA256, SCRAM_SHA512:
configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE);
configureBasic(credential, configs);
break;
default:
throw new NotAuthorizedException("Unknown");
}
}

void configureOAuthBearer(Map<String, Object> configs) {
void configureOAuthBearer(SaslJaasConfigCredential credential, Map<String, Object> configs) {
log.trace("SASL/OAUTHBEARER enabled");

configs.putIfAbsent(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, STRIMZI_OAUTH_CALLBACK);
// Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin)
// May still cause warnings to be logged when token will expire in less than SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS.
configs.putIfAbsent(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, "0");

String jaasConfig = getAuthorization(BEARER)
.map(SASL_OAUTH_CONFIG_TEMPLATE::formatted)
.orElseThrow(() -> new NotAuthorizedException(BEARER.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value());
}

void configureBasic(Map<String, Object> configs, String template) {
void configureBasic(SaslJaasConfigCredential credential, Map<String, Object> configs) {
log.trace("SASL/SCRAM enabled");

String jaasConfig = getBasicAuthentication()
.map(template::formatted)
.orElseThrow(() -> new NotAuthorizedException(BASIC.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

Optional<String[]> getBasicAuthentication() {
return getAuthorization(BASIC)
.map(Base64.getDecoder()::decode)
.map(String::new)
.filter(authn -> authn.indexOf(':') >= 0)
.map(authn -> new String[] {
authn.substring(0, authn.indexOf(':')),
authn.substring(authn.indexOf(':') + 1)
})
.filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty());
}

Optional<String> getAuthorization(String scheme) {
return Optional.ofNullable(headers.getHeaderString(HttpHeaders.AUTHORIZATION))
.filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length()))
.map(header -> header.substring(scheme.length()));
configs.put(SaslConfigs.SASL_JAAS_CONFIG, credential.value());
}

private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
import com.github.streamshub.console.api.model.ConsumerGroup;
import com.github.streamshub.console.api.model.ConsumerGroupFilterParams;
import com.github.streamshub.console.api.model.ListFetchParams;
import com.github.streamshub.console.api.security.Authorized;
import com.github.streamshub.console.api.security.ResourcePrivilege;
import com.github.streamshub.console.api.service.ConsumerGroupService;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.FieldFilter;
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;
import com.github.streamshub.console.config.security.Privilege;

import io.xlate.validation.constraints.Expression;

Expand All @@ -67,6 +70,8 @@ public class ConsumerGroupsResource {
@APIResponseSchema(ConsumerGroup.ListResponse.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.LIST)
public CompletionStage<Response> listConsumerGroups(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -132,6 +137,8 @@ public CompletionStage<Response> listConsumerGroups(
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
@Authorized
@ResourcePrivilege(Privilege.GET)
public CompletionStage<Response> describeConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -200,6 +207,8 @@ public CompletionStage<Response> describeConsumerGroup(
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
@Authorized
@ResourcePrivilege(Privilege.UPDATE)
public CompletionStage<Response> patchConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down Expand Up @@ -244,6 +253,8 @@ public CompletionStage<Response> patchConsumerGroup(
@Path("{groupId}")
@DELETE
@APIResponseSchema(responseCode = "204", value = Void.class)
@Authorized
@ResourcePrivilege(Privilege.DELETE)
public CompletionStage<Response> deleteConsumerGroup(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
Expand Down
Loading

0 comments on commit da9cc56

Please sign in to comment.