Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer.produce fails fast #1340

Open
wants to merge 7 commits into
base: series/3.x
Choose a base branch
from

Conversation

rodrigo-molina
Copy link

@rodrigo-molina rodrigo-molina commented Aug 5, 2024

Description

Introduce KafkaProducer.failFastProduce producer configuration:

/**
* Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any
* [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error.
*
* When set to `true`, the `produce` method will fail fast, returning an error as soon as any
* record in the [[ProducerRecords]] fails to be sent.
*
* The default value is `false`, meaning the `produce` method will not fail fast and will
* continue processing other records even if some callbacks fail.
*/
def failFastProduce: Boolean

Proposed solution

When failFastProduce is set to true, each fs2.kafka.producer.produce call of ProducerRecords creates a scala.concurrent.Promise[Throwable]. This Promise resolution is raced against calling org.apache.kafka.clients.producer.KafkaProducer.send per record. Any failing callback will complete the promise with the failure, canceling the rest of the pending org.apache.kafka.clients.producer.KafkaProducer.send and in-flight callbacks.

if (failFastProduce)
Async[F]
.delay(Promise[Throwable]())
.flatMap { produceRecordError =>
Async[F]
.race(
Async[F]
.fromFutureCancelable(Async[F].delay(produceRecordError.future, Async[F].unit)),
produceRecords(produceRecordError.some)
)
.rethrow
}
else produceRecords(None)

Explanation

fs2.kafka.KafkaProducer.produce returns F[F[...]], where the first effect invokes the underlying org.apache.kafka.clients.producer.KafkaProducer.send and attaches promises to the corresponding callbacks. The second effect is waiting for the callbacks’ resolution. In use cases where all the records in the batch need to be confirmed, any callback failure will cause the final effect resolution to fail, achieving the same outcome sooner.

In non-transient errors such as invalid username, password, or non-existent topics, org.apache.kafka.clients.producer.KafkaProducer.send is blocked for max.block.ms, default to 60 seconds. For example, a fs2.kafka.producer attempting to produce N ProducerRecords will take at least N * 60 seconds to fail. In other words, faulty Kafka producers will take considerable time to fail by repeatedly calling brokers for metadata synchronization.

See:

@aartigao
Copy link
Contributor

aartigao commented Aug 7, 2024

Hey Rodrigo! Thanks for your contribution 🙌🏽

I have some doubts about this approach, though. What if, as a user, I don't want it to fail fast? (because I don't care if some records in the batch fail?). This is valid scenario too. As an example, at $WORK we push constantly the locations of couriers coming from mobile app, and we don't want a single record to ruin the whole batch (because in a matter of seconds, a new location will be sent anyways).

But I see your point here, and it's a valid scenario too. Thing is, to me, this is a behavioral change that could potentially affect current users of the lib. And we can't introduce this kind of changes straight away. I'd be more than happy to add it as a feature under a configuration setting in ProducerSettings (a setting disabled by default to be compatible with all users). Something like failFastBatch or failFastSend or other naming suggestions.

Let me know what you think about it 😉

@rodrigo-molina
Copy link
Author

Hey Alan, thanks for your detailed response.

As you mentioned, I agree with the importance of not disrupting the current behavior, especially since the library supports fire-and-forget scenarios.

I'll work on making the producer fail fast feature configurable. What do you think about the name failFastProduce? It aligns with KafkaProducer.produce.

@aartigao
Copy link
Contributor

Sounds good!

@rodrigo-molina
Copy link
Author

I've updated the code and the PR description accordingly.

Please let me know your thoughts 😄

@erikvanoosten
Copy link

Another relevant question: why does the kafka client not fail immediately for non-transient errors such as invalid username, password, and non-existing topics?

@rodrigo-molina
Copy link
Author

Another relevant question: why does the kafka client not fail immediately for non-transient errors such as invalid username, password, and non-existing topics?

Thanks @erikvanoosten for chipping in.

In my opinion, non-transient errors should fail fast since they will continue to fail (and block the producer with repeated metadata request calls) until the error is resolved externally.

While this discussion is relevant to the scope of this PR, it may be interpreted as an extra feature. On the one hand, there's the idea of making the producer fail fast on any failure within the batch; on the other hand, there's the possibility of failing fast only on specific, predefined errors (assumed to be non-recoverable).

A proposal to address both is allowing users to configure which specific exceptions trigger the fail-fast behavior. This way, users can define what qualifies as a non-transient error for their specific use case and/or decide which errors to fail fast on. To simplify this for users, we could even provide a set of default error types known to be non-transient in Kafka. To avoid changing the producer's current semantics, we can disable the feature by default (no exception is failed fast by default). Things I like about this proposal:

  1. The fail fast feature becomes highly configurable for users
  2. Some defaults provide recommendations for basic use cases (like fail fast on non-transient errors) and prevent users from deep diving into some Kafka exceptions
  3. Current KafkaProducer.produce semantics are not changed

Challenges to the proposal:

  1. For "things I like second point", distinguishing some errors can be tricky and non-consistent over Kafka client versions. For instance, a topic not found is a TimeoutException with the message "Topic $nonExistentTopic not present in metadata after $max.block.ms ms.". If we don't want to maintain this as part of the library, we can avoid providing the defaults; the proposal still gives users enough autonomy to implement "fail fast on non-transient errors" in their context.

Finally, as an implementation detail, fail-fast errors can be provided as a partial function in the ProducerSettings. Something like:

ProducerSettings {
...
   /** 
     * Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any 
     * [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error. 
     * 
     * When set to `true`, the `produce` method will fail fast, returning an error as soon as any 
     * record in the [[ProducerRecords]] fails to be sent. 
     * 
     * The default value is `false`, meaning the `produce` method will not fail fast and will 
     * continue processing other records even if some callbacks fail. 
     */ 
   def failFastProduce: Boolean 


   /** 
     * Controls which exceptions makes [[fs2.kafka.KafkaProducer.produce]] fails immediately.
     * 
     * This setting is only applied when [[failFastProduce]] is set to true.
     * 
     * The default value is to fail fast on all exceptions.
     */ 
   def failFastProduceOn: PartialFunction[Throwable, Boolean]

...
}

Please let me know your thoughts.

@erikvanoosten
Copy link

erikvanoosten commented Aug 16, 2024

While this discussion is relevant to the scope of this PR, it may be interpreted as an extra feature...

Yeah I agree that this PR is a nice and performant solution (even though it is a kludge, as in, an inelegant solution). We can't wait for the kafka maintainers to fix this properly (if they will at all).

@@ -224,7 +241,8 @@ object ProducerSettings {
override val valueSerializer: Resource[F, ValueSerializer[F, V]],
override val customBlockingContext: Option[ExecutionContext],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration
override val closeTimeout: FiniteDuration,
override val failFastProduce: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of defining here the default value, wouldn't it be more convenient to set it in the create method where the other default values are set?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's addressed in commit: 5472f25

if (settings.producerSettings.failFastProduce)
Async[F]
.delay(Promise[Throwable]())
.flatMap(produceRecordError => produceRecords(produceRecordError.some))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Promise is created but never listened (if I read correctly). Is that deliberate?

Copy link

@erikvanoosten erikvanoosten Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The author of this PR is on holiday so I'll try to answer for him: it seems that the race that is present in modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala, was not added here. It is my gues that this was an oversight.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the race is missing.
I'll add it.

Copy link
Author

@rodrigo-molina rodrigo-molina Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. It's added in commit 6442060

I also added a test for it:

val result = intercept[RuntimeException] {
TransactionalKafkaProducer
.stream(settings)
.evalMap(_.produce(Chunk.singleton(committable)))
.compile
.lastOrError
.unsafeRunSync()
}
result shouldBe expectedErrorOnSecondRecord
assert(transactionAborted, "The transaction should be aborted")

An interesting discovery is that KafkaProducer.produce and TransactionalKafkaProducer.produce behave differently. Specifically, KafkaProducer.produce defers the responsibility of waiting for callback resolutions to the user, as the F[F[_]] result. In contrast, TransactionalKafkaProducer.produce waits for the callbacks.

def produce(
records: ProducerRecords[K, V]
): F[F[ProducerResult[K, V]]]

def produce(
records: TransactionalProducerRecords[F, K, V]
): F[ProducerResult[K, V]]

val produce = records
.traverse(
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.flatMap(_.sequence)

records
.traverse(produceRecord(keySerializer, valueSerializer, producer, blocking))
.map(_.sequence)

note the flatMap and map difference.

Description:
`fs2.kafka.KafkaProducer.produce` fails immediately when any `org.apache.kafka.clients.producer.KafkaProducer.send` callback fails for the provided `ProducerRecords`.

Additionally, `BaseKafkaSpec` sets `KAFKA_AUTO_CREATE_TOPICS_ENABLE` to false: any topic required by a test must be explicitly created.

Proposed solution:
Each `fs2.kafka.producer.produce` call of `ProducerRecords` creates a `scala.concurrent.Promise[Throwable]`. This Promise resolution is raced against calling `org.apache.kafka.clients.producer.KafkaProducer.send` per record. Any failing callback will complete the promise with the failure, cancelling the rest of pending `org.apache.kafka.clients.producer.KafkaProducer.send` and in-flight callbacks.

Explanation:
`fs2.kafka.KafkaProducer.produce` returns F[F[...]], where the first effect invokes the underlying `org.apache.kafka.clients.producer.KafkaProducer.send` and attaches promises to the corresponding callbacks. The inner effect waits for the callbacks’ resolution. Any callback failure will cause the final effect resolution to fail, achieving the same final outcome sooner.

In cases of non-transient errors such as: invalid username or password, or non-existent topics, `org.apache.kafka.clients.producer.KafkaProducer.send` is blocked for max.block.ms, default to 60 seconds. For example, an `fs2.kafka.producer` attempting to produce N `ProducerRecords` will take at least N * 60 seconds to fail. In other words, large ProducerRecords will take considerable time to fail by repeatedly calling brokers for metadata synchronisation.
Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any
[[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error.

When set to `true`, the `produce` method will fail fast, returning an error as soon as any
record in the [[ProducerRecords]] fails to be sent.

The default value is set to false.
@@ -17,8 +17,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
* the fs2-kafka `KafkaProducer`. This is needed in order to instantiate
* [[fs2.kafka.KafkaProducer]] and [[fs2.kafka.TransactionalKafkaProducer]].<br><br>
*
* By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However this
* behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit
* By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't help myself here. Please let me know if you prefer to keep this change in a different PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants