-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaProducer.produce fails fast #1340
base: series/3.x
Are you sure you want to change the base?
KafkaProducer.produce fails fast #1340
Conversation
Hey Rodrigo! Thanks for your contribution 🙌🏽 I have some doubts about this approach, though. What if, as a user, I don't want it to fail fast? (because I don't care if some records in the batch fail?). This is valid scenario too. As an example, at $WORK we push constantly the locations of couriers coming from mobile app, and we don't want a single record to ruin the whole batch (because in a matter of seconds, a new location will be sent anyways). But I see your point here, and it's a valid scenario too. Thing is, to me, this is a behavioral change that could potentially affect current users of the lib. And we can't introduce this kind of changes straight away. I'd be more than happy to add it as a feature under a configuration setting in Let me know what you think about it 😉 |
Hey Alan, thanks for your detailed response. As you mentioned, I agree with the importance of not disrupting the current behavior, especially since the library supports fire-and-forget scenarios. I'll work on making the producer fail fast feature configurable. What do you think about the name |
Sounds good! |
I've updated the code and the PR description accordingly. Please let me know your thoughts 😄 |
Another relevant question: why does the kafka client not fail immediately for non-transient errors such as invalid username, password, and non-existing topics? |
Thanks @erikvanoosten for chipping in. In my opinion, non-transient errors should fail fast since they will continue to fail (and block the producer with repeated metadata request calls) until the error is resolved externally. While this discussion is relevant to the scope of this PR, it may be interpreted as an extra feature. On the one hand, there's the idea of making the producer fail fast on any failure within the batch; on the other hand, there's the possibility of failing fast only on specific, predefined errors (assumed to be non-recoverable). A proposal to address both is allowing users to configure which specific exceptions trigger the fail-fast behavior. This way, users can define what qualifies as a non-transient error for their specific use case and/or decide which errors to fail fast on. To simplify this for users, we could even provide a set of default error types known to be non-transient in Kafka. To avoid changing the producer's current semantics, we can disable the feature by default (no exception is failed fast by default). Things I like about this proposal:
Challenges to the proposal:
Finally, as an implementation detail, fail-fast errors can be provided as a partial function in the ProducerSettings. Something like: ProducerSettings {
...
/**
* Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any
* [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error.
*
* When set to `true`, the `produce` method will fail fast, returning an error as soon as any
* record in the [[ProducerRecords]] fails to be sent.
*
* The default value is `false`, meaning the `produce` method will not fail fast and will
* continue processing other records even if some callbacks fail.
*/
def failFastProduce: Boolean
/**
* Controls which exceptions makes [[fs2.kafka.KafkaProducer.produce]] fails immediately.
*
* This setting is only applied when [[failFastProduce]] is set to true.
*
* The default value is to fail fast on all exceptions.
*/
def failFastProduceOn: PartialFunction[Throwable, Boolean]
...
} Please let me know your thoughts. |
Yeah I agree that this PR is a nice and performant solution (even though it is a kludge, as in, an inelegant solution). We can't wait for the kafka maintainers to fix this properly (if they will at all). |
@@ -224,7 +241,8 @@ object ProducerSettings { | |||
override val valueSerializer: Resource[F, ValueSerializer[F, V]], | |||
override val customBlockingContext: Option[ExecutionContext], | |||
override val properties: Map[String, String], | |||
override val closeTimeout: FiniteDuration | |||
override val closeTimeout: FiniteDuration, | |||
override val failFastProduce: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of defining here the default value, wouldn't it be more convenient to set it in the create
method where the other default values are set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's addressed in commit: 5472f25
if (settings.producerSettings.failFastProduce) | ||
Async[F] | ||
.delay(Promise[Throwable]()) | ||
.flatMap(produceRecordError => produceRecords(produceRecordError.some)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A Promise
is created but never listened (if I read correctly). Is that deliberate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The author of this PR is on holiday so I'll try to answer for him: it seems that the race that is present in modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala, was not added here. It is my gues that this was an oversight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the race is missing.
I'll add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. It's added in commit 6442060
I also added a test for it:
fs2-kafka/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala
Lines 479 to 489 in 6442060
val result = intercept[RuntimeException] { | |
TransactionalKafkaProducer | |
.stream(settings) | |
.evalMap(_.produce(Chunk.singleton(committable))) | |
.compile | |
.lastOrError | |
.unsafeRunSync() | |
} | |
result shouldBe expectedErrorOnSecondRecord | |
assert(transactionAborted, "The transaction should be aborted") |
An interesting discovery is that KafkaProducer.produce
and TransactionalKafkaProducer.produce
behave differently. Specifically, KafkaProducer.produce
defers the responsibility of waiting for callback resolutions to the user, as the F[F[_]] result. In contrast, TransactionalKafkaProducer.produce
waits for the callbacks.
fs2-kafka/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Lines 45 to 47 in 64a751e
def produce( | |
records: ProducerRecords[K, V] | |
): F[F[ProducerResult[K, V]]] |
fs2-kafka/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala
Lines 41 to 43 in 64a751e
def produce( | |
records: TransactionalProducerRecords[F, K, V] | |
): F[ProducerResult[K, V]] |
fs2-kafka/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala
Lines 144 to 148 in 64a751e
val produce = records | |
.traverse( | |
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer, blocking) | |
) | |
.flatMap(_.sequence) |
fs2-kafka/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Lines 178 to 180 in 64a751e
records | |
.traverse(produceRecord(keySerializer, valueSerializer, producer, blocking)) | |
.map(_.sequence) |
note the flatMap
and map
difference.
Description: `fs2.kafka.KafkaProducer.produce` fails immediately when any `org.apache.kafka.clients.producer.KafkaProducer.send` callback fails for the provided `ProducerRecords`. Additionally, `BaseKafkaSpec` sets `KAFKA_AUTO_CREATE_TOPICS_ENABLE` to false: any topic required by a test must be explicitly created. Proposed solution: Each `fs2.kafka.producer.produce` call of `ProducerRecords` creates a `scala.concurrent.Promise[Throwable]`. This Promise resolution is raced against calling `org.apache.kafka.clients.producer.KafkaProducer.send` per record. Any failing callback will complete the promise with the failure, cancelling the rest of pending `org.apache.kafka.clients.producer.KafkaProducer.send` and in-flight callbacks. Explanation: `fs2.kafka.KafkaProducer.produce` returns F[F[...]], where the first effect invokes the underlying `org.apache.kafka.clients.producer.KafkaProducer.send` and attaches promises to the corresponding callbacks. The inner effect waits for the callbacks’ resolution. Any callback failure will cause the final effect resolution to fail, achieving the same final outcome sooner. In cases of non-transient errors such as: invalid username or password, or non-existent topics, `org.apache.kafka.clients.producer.KafkaProducer.send` is blocked for max.block.ms, default to 60 seconds. For example, an `fs2.kafka.producer` attempting to produce N `ProducerRecords` will take at least N * 60 seconds to fail. In other words, large ProducerRecords will take considerable time to fail by repeatedly calling brokers for metadata synchronisation.
Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error. When set to `true`, the `produce` method will fail fast, returning an error as soon as any record in the [[ProducerRecords]] fails to be sent. The default value is set to false.
502843e
to
6442060
Compare
@@ -17,8 +17,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer | |||
* the fs2-kafka `KafkaProducer`. This is needed in order to instantiate | |||
* [[fs2.kafka.KafkaProducer]] and [[fs2.kafka.TransactionalKafkaProducer]].<br><br> | |||
* | |||
* By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However this | |||
* behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit | |||
* By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't help myself here. Please let me know if you prefer to keep this change in a different PR.
Description
Introduce
KafkaProducer.failFastProduce
producer configuration:fs2-kafka/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
Lines 218 to 228 in aed7ab3
Proposed solution
When
failFastProduce
is set to true, eachfs2.kafka.producer.produce
call ofProducerRecords
creates ascala.concurrent.Promise[Throwable]
. This Promise resolution is raced against callingorg.apache.kafka.clients.producer.KafkaProducer.send
per record. Any failing callback will complete the promise with the failure, canceling the rest of the pendingorg.apache.kafka.clients.producer.KafkaProducer.send
and in-flight callbacks.fs2-kafka/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Lines 186 to 198 in 2559d96
Explanation
fs2.kafka.KafkaProducer.produce
returns F[F[...]], where the first effect invokes the underlyingorg.apache.kafka.clients.producer.KafkaProducer.send
and attaches promises to the corresponding callbacks. The second effect is waiting for the callbacks’ resolution. In use cases where all the records in the batch need to be confirmed, any callback failure will cause the final effect resolution to fail, achieving the same outcome sooner.In non-transient errors such as invalid username, password, or non-existent topics,
org.apache.kafka.clients.producer.KafkaProducer.send
is blocked for max.block.ms, default to 60 seconds. For example, afs2.kafka.producer
attempting to produce NProducerRecords
will take at least N * 60 seconds to fail. In other words, faulty Kafka producers will take considerable time to fail by repeatedly calling brokers for metadata synchronization.See: