Skip to content

Commit

Permalink
add topic config
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 5, 2023
1 parent 69b7a22 commit 9c88f1c
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/en/seatunnel-engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ map:
+ The configuration with the prefix 'consumer.override.' is used to override the configuration of the consumer
+ The configuration with the prefix 'producer.override.' is used to override the configuration of the producer
+ The configuration with the prefix 'admin.override.' is used to override the configuration of the admin
+ The configuration with the prefix 'topic.override.' is used to override the configuration of the topic

## 6. Config SeaTunnel Engine Client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public void initialize(Map<String, Object> config) {
KafkaConfiguration.setExtraConfiguration(
config,
KafkaConfigurationConstants.KAFKA_ADMIN_CONFIGS_PREFIX))
.topicConfigs(
KafkaConfiguration.setExtraConfiguration(
config,
KafkaConfigurationConstants.KAFKA_TOPIC_CONFIGS_PREFIX))
.build();

// Init serializer, default ProtoStuffSerializer
Expand Down Expand Up @@ -227,7 +231,7 @@ public Set<Object> storeAll(Map<Object, Object> all) {
public boolean delete(Object key) {
try {
byte[] bKey = convertToBytes(key);
// Set tombstone message
// Sending tombstone messages will be cleared during topic compact
Future<RecordMetadata> callback =
producer.send(
new ProducerRecord<>(kafkaConfiguration.getStorageTopic(), bKey, null));
Expand Down Expand Up @@ -323,6 +327,7 @@ private void poll(
throw e;
} catch (KafkaException e) {
log.error("Error polling: " + e);
throw e;
}
}

Expand All @@ -339,7 +344,7 @@ public Set<Object> loadAllKeys() {
public void destroy(boolean deleteAllFileFlag) {
log.info("start destroy IMapKafkaStorage, businessName is {}", businessName);
if (deleteAllFileFlag) {
// delete compact topic
// Delete compact topic
TopicAdmin admin = new TopicAdmin(kafkaConfiguration);
try {
admin.deleteTopic(kafkaConfiguration.getStorageTopic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class KafkaConfiguration implements Serializable {
private Map<String, Object> producerConfigs;
private Map<String, Object> consumerConfigs;
private Map<String, Object> adminConfigs;
private Map<String, Object> topicConfigs;

public static Map<String, Object> setExtraConfiguration(
Map<String, Object> config, String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public class KafkaConfigurationConstants {
public static final String KAFKA_CONSUMER_CONFIGS_PREFIX = "consumer.override.";
public static final String KAFKA_PRODUCER_CONFIGS_PREFIX = "producer.override.";
public static final String KAFKA_ADMIN_CONFIGS_PREFIX = "admin.override.";
public static final String KAFKA_TOPIC_CONFIGS_PREFIX = "topic.override.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void maybeCreateTopic(String topic) {
log.info("Creating topic '{}'", topic);
NewTopicBuilder newTopicBuilder =
NewTopicBuilder.defineTopic(topic)
.config(kafkaConfiguration.getTopicConfigs())
.compacted()
.partitions(kafkaConfiguration.getStorageTopicPartition())
.replicationFactor(
Expand Down

0 comments on commit 9c88f1c

Please sign in to comment.