Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Use idempotent producer once again to enable safe retries (#1540)
Browse files Browse the repository at this point in the history
The main objective is to enable retries, which we can only safely do when
using the idempotence feature.
  • Loading branch information
a1exsh authored Aug 3, 2023
1 parent 23c2ee8 commit ed2616e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final Long DEFAULT_TOPIC_RETENTION = 100000000L;
private static final CleanupPolicy DEFAULT_CLEANUP_POLICY = CleanupPolicy.DELETE;
private static final int KAFKA_RETRIES = 10;
private static final boolean KAFKA_IDEMPOTENCE = false;
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_DELIVERY_TIMEOUT = 30000;
private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000;
Expand Down Expand Up @@ -110,7 +111,8 @@ public void setup() {
DEFAULT_CURATOR_MAX_LIFETIME_MS,
DEFAULT_CURATOR_ROTATION_MS);

kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_IDEMPOTENCE, KAFKA_REQUEST_TIMEOUT,
KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, KAFKA_ENABLE_AUTO_COMMIT,
KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "",
KAFKA_COMPRESSION_TYPE, TCP_SEND_BUFFER_SIZE, Optional.of(9093),
Expand Down
1 change: 1 addition & 0 deletions app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ nakadi:
producers.count: 1
time-lag-checker.consumer-pool.size: 0
retries: 0
idempotence: false
request.timeout.ms: 30000
instanceType: t2.large
poll.timeoutMs: 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ public Properties getKafkaProducerProperties() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);

producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaSettings.getRetries());
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaSettings.getIdempotence());
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaSettings.getRequestTimeoutMs());
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaSettings.getBufferMemory());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSettings.getBatchSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
public class KafkaSettings {

private final int retries;
private final boolean idempotence;

// kafka client requires this property to be int
// https://github.com/apache/kafka/blob/d9206500bf2f99ce93f6ad64c7a89483100b3b5f/clients/src/main/java/org/apache
// /kafka/clients/producer/ProducerConfig.java#L261
Expand All @@ -35,6 +37,7 @@ public class KafkaSettings {
private final Optional<String> kafkaPassword;
@Autowired
public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries,
@Value("${nakadi.kafka.idempotence}") final boolean idempotence,
@Value("${nakadi.kafka.request.timeout.ms}") final int requestTimeoutMs,
@Value("${nakadi.kafka.batch.size}") final int batchSize,
@Value("${nakadi.kafka.buffer.memory}") final long bufferMemory,
Expand All @@ -54,6 +57,7 @@ public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries,
@Value("${nakadi.kafka.username:#{null}}") final Optional<String> kafkaUsername,
@Value("${nakadi.kafka.password:#{null}}") final Optional<String> kafkaPassword) {
this.retries = retries;
this.idempotence = idempotence;
this.requestTimeoutMs = requestTimeoutMs;
this.batchSize = batchSize;
this.bufferMemory = bufferMemory;
Expand All @@ -77,6 +81,10 @@ public int getRetries() {
return retries;
}

public boolean getIdempotence() {
return idempotence;
}

public int getRequestTimeoutMs() {
return requestTimeoutMs;
}
Expand Down

0 comments on commit ed2616e

Please sign in to comment.