Skip to content

Commit

Permalink
Return auth type in Kafka meta, treat PLAIN like SCRAM-SHA
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Jul 31, 2024
1 parent 495b82d commit 522fc0e
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 232 deletions.
5 changes: 3 additions & 2 deletions api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ create instances of both Prometheus and Kafka.

### Start Console API in Development Mode

Start the API in development mode from the repository root directory.
Start the API in development mode from the repository root directory. Ensure that the config-path given points to a
valid `console-config.yaml`. See [console-config-example.yaml](../console-config-example.yaml) for an example.

```shell
mvn -am -pl api quarkus:dev
mvn -am -pl api quarkus:dev -Dconsole.config-path=$(pwd)/console-config.yaml
```

### Using the Instance API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -62,6 +63,7 @@
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;

Expand Down Expand Up @@ -92,19 +94,21 @@
@ApplicationScoped
public class ClientFactory {

public static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
public static final String PLAIN = "PLAIN";
public static final String SCRAM_SHA256 = "SCRAM-SHA-256";
public static final String SCRAM_SHA512 = "SCRAM-SHA-512";

private static final String BEARER = "Bearer ";
private static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler";
private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ " required"
+ " oauth.access.token=\"%s\" ;";

private static final String BASIC = "Basic ";
private static final String SCRAM_SHA512 = "SCRAM-SHA-512";
private static final String SASL_SCRAM_CONFIG_TEMPLATE = ScramLoginModule.class.getName()
+ " required"
+ " username=\"%s\""
+ " password=\"%s\" ;";
private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;";
private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName());
private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName());

static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
private final Function<String, NotFoundException> noSuchKafka =
Expand All @@ -129,6 +133,9 @@ public class ClientFactory {
@Inject
KafkaClusterService kafkaClusterService;

@Inject
ValidationProxy validationService;

@Inject
Instance<TrustAllCertificateManager> trustManager;

Expand Down Expand Up @@ -192,8 +199,9 @@ public ConsoleConfig produceConsoleConfig() {

return consoleConfig;
})
.map(validationService::validate)
.orElseGet(() -> {
log.infof("Console configuration not specified");
log.warn("Console configuration has not been specified using `console.config-path` property");
return new ConsoleConfig();
});
}
Expand All @@ -213,6 +221,7 @@ Map<String, KafkaContext> produceKafkaContexts(ConsoleConfig consoleConfig,
consoleConfig.getKafka().getClusters()
.stream()
.filter(c -> cachedKafkaResource(c).isEmpty())
.filter(Predicate.not(KafkaClusterConfig::hasNamespace))
.forEach(clusterConfig -> putKafkaContext(contexts,
clusterConfig,
Optional.empty(),
Expand Down Expand Up @@ -309,19 +318,14 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
String clusterKey = clusterConfig.clusterKey();
String clusterId = Optional.ofNullable(clusterConfig.getId())
.or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
.orElse(null);
.orElseGet(clusterConfig::getName);

if (clusterId == null) {
log.warnf("""
Ignoring Kafka cluster %s. Cluster id value missing in \
configuration and no Strimzi Kafka resources found with matching \
name and namespace.""", clusterKey);
} else if (!replace && contexts.containsKey(clusterId)) {
if (!replace && contexts.containsKey(clusterId)) {
log.warnf("""
Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \
configuration must be unique and may not match id values of \
clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey);
} else if (truststoreRequired(adminConfigs)) {
} else if (kafkaResource.isPresent() && truststoreRequired(adminConfigs)) {
if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) {
log.warnf("""
Ignoring update to Kafka custom resource %s. Connection requires \
Expand All @@ -337,13 +341,17 @@ void putKafkaContext(Map<String, KafkaContext> contexts,

Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore)
.map(store -> {
.map(store -> store.getByKey(clusterConfig.clusterKey()))
.or(() -> {
String key = clusterConfig.clusterKey();
Kafka resource = store.getByKey(key);
if (resource == null) {
log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key);

if (kafkaInformer.isPresent()) {
log.warnf("Configuration references Kubernetes Kafka resource %s, but it was not found", key);
} else {
log.warnf("Configuration references Kubernetes Kafka resource %s, but Kubernetes access is disabled", key);
}
return resource;

return Optional.empty();
}) : Optional.empty();
}

Expand Down Expand Up @@ -438,7 +446,7 @@ public KafkaContext produceKafkaContext(Map<String, KafkaContext> contexts,
* configuration. The user must provide the login secrets
* in the request in that case.
*/
var adminConfigs = maybeAuthenticate(ctx.configs(Admin.class));
var adminConfigs = maybeAuthenticate(ctx, Admin.class);
var admin = adminBuilder.apply(adminConfigs);
return new KafkaContext(ctx, filter.apply(admin));
}
Expand All @@ -463,7 +471,7 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk
@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context.configs(Consumer.class));
var configs = maybeAuthenticate(context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;
}
Expand All @@ -475,7 +483,7 @@ public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consum
@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context.configs(Producer.class));
var configs = maybeAuthenticate(context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
}
Expand All @@ -484,11 +492,13 @@ public void producerDisposer(@Disposes Supplier<Producer<String, String>> produc
producer.get().close();
}

Map<String, Object> maybeAuthenticate(Map<String, Object> configs) {
Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Map<String, Object> configs = context.configs(clientType);

if (configs.containsKey(SaslConfigs.SASL_MECHANISM)
&& !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
configs = new HashMap<>(configs);
configureAuthentication(configs);
configureAuthentication(context.saslMechanism(clientType), configs);
}

return configs;
Expand Down Expand Up @@ -665,13 +675,16 @@ void logConfig(String clientType, Map<String, Object> config) {
}
}

void configureAuthentication(Map<String, Object> configs) {
switch (configs.get(SaslConfigs.SASL_MECHANISM).toString()) {
void configureAuthentication(String saslMechanism, Map<String, Object> configs) {
switch (saslMechanism) {
case OAUTHBEARER:
configureOAuthBearer(configs);
break;
case SCRAM_SHA512:
configureScram(configs);
case PLAIN:
configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE);
break;
case SCRAM_SHA256, SCRAM_SHA512:
configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE);
break;
default:
throw new NotAuthorizedException("Unknown");
Expand All @@ -693,11 +706,11 @@ void configureOAuthBearer(Map<String, Object> configs) {
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
}

void configureScram(Map<String, Object> configs) {
void configureBasic(Map<String, Object> configs, String template) {
log.trace("SASL/SCRAM enabled");

String jaasConfig = getBasicAuthentication()
.map(SASL_SCRAM_CONFIG_TEMPLATE::formatted)
.map(template::formatted)
.orElseThrow(() -> new NotAuthorizedException(BASIC.trim()));

configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
Expand Down
Loading

0 comments on commit 522fc0e

Please sign in to comment.