Skip to content

Commit

Permalink
Merge pull request #376 from DependencyTrack/issue-656
Browse files Browse the repository at this point in the history
Support pass-through properties for Kafka Streams and Kafka Producer
  • Loading branch information
nscuro authored Oct 21, 2023
2 parents adac0b8 + 5b70548 commit 5f951a4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;

public class KafkaProducerInitializer implements ServletContextListener {
Expand Down Expand Up @@ -83,7 +84,7 @@ public static void tearDown() {
private static Producer<byte[], byte[]> createProducer() {
final var properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.getInstance().getProperty(ConfigKey.KAFKA_BOOTSTRAP_SERVERS));
properties.put(ProducerConfig.CLIENT_ID_CONFIG,Config.getInstance().getProperty(ConfigKey.APPLICATION_ID));
properties.put(ProducerConfig.CLIENT_ID_CONFIG, Config.getInstance().getProperty(ConfigKey.APPLICATION_ID));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);
Expand All @@ -98,6 +99,17 @@ private static Producer<byte[], byte[]> createProducer() {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Config.getInstance().getProperty(ConfigKey.KEY_STORE_PASSWORD));
}
}

final Map<String, String> passThroughProperties = Config.getInstance().getPassThroughProperties("kafka.producer");
for (final Map.Entry<String, String> passThroughProperty : passThroughProperties.entrySet()) {
final String key = passThroughProperty.getKey().replaceFirst("^kafka\\.producer\\.", "");
if (ProducerConfig.configNames().contains(key)) {
properties.put(key, passThroughProperty.getValue());
} else {
LOGGER.warn("%s is not a known Producer property; Ignoring".formatted(key));
}
}

return new KafkaProducer<>(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,29 @@
import javax.servlet.ServletContextListener;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class KafkaStreamsInitializer implements ServletContextListener {

private static final Logger LOGGER = Logger.getLogger(KafkaStreamsInitializer.class);
private static final Duration DRAIN_TIMEOUT_DURATION =
Duration.parse(Config.getInstance().getProperty(ConfigKey.KAFKA_STREAMS_DRAIN_TIMEOUT_DURATION));
private static final Duration DRAIN_TIMEOUT_DURATION;
private static final Pattern CONSUMER_PREFIX_PATTERN;
private static final Pattern PRODUCER_PREFIX_PATTERN;

static {
DRAIN_TIMEOUT_DURATION = Duration.parse(Config.getInstance().getProperty(ConfigKey.KAFKA_STREAMS_DRAIN_TIMEOUT_DURATION));

CONSUMER_PREFIX_PATTERN = Pattern.compile("^(%s|%s|%s)".formatted(
Pattern.quote(StreamsConfig.CONSUMER_PREFIX),
Pattern.quote(StreamsConfig.GLOBAL_CONSUMER_PREFIX),
Pattern.quote(StreamsConfig.MAIN_CONSUMER_PREFIX)
));

PRODUCER_PREFIX_PATTERN = Pattern.compile("^" + Pattern.quote(StreamsConfig.PRODUCER_PREFIX));
}

private static KafkaStreams STREAMS;
private static KafkaStreamsMetrics STREAMS_METRICS;
Expand Down Expand Up @@ -91,6 +107,28 @@ static Properties getDefaultProperties() {
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.put(ProducerConfig.ACKS_CONFIG, "all");

final Map<String, String> passThroughProperties = Config.getInstance().getPassThroughProperties("kafka.streams");
for (final Map.Entry<String, String> passThroughProperty : passThroughProperties.entrySet()) {
final String key = passThroughProperty.getKey().replaceFirst("^kafka\\.streams\\.", "");
if (StreamsConfig.configDef().names().contains(key)) {
properties.put(key, passThroughProperty.getValue());
} else {
final Matcher consumerPrefixMatcher = CONSUMER_PREFIX_PATTERN.matcher(key);
final Matcher producerPrefixMatcher = PRODUCER_PREFIX_PATTERN.matcher(key);

final boolean isValidConsumerProperty = ConsumerConfig.configNames().contains(key)
|| (consumerPrefixMatcher.find() && ConsumerConfig.configNames().contains(consumerPrefixMatcher.replaceFirst("")));
final boolean isValidProducerProperty = ProducerConfig.configNames().contains(key)
|| (producerPrefixMatcher.find() && ProducerConfig.configNames().contains(producerPrefixMatcher.replaceFirst("")));
if (isValidConsumerProperty || isValidProducerProperty) {
properties.put(key, passThroughProperty.getValue());
} else {
LOGGER.warn("%s is not a known Streams, Consumer, or Producer property; Ignoring".formatted(key));
}
}
}

return properties;
}

Expand Down

0 comments on commit 5f951a4

Please sign in to comment.