Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer.produce fails fast #1340

Merged
merged 7 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,38 @@ object KafkaProducer {
withProducer: WithProducer[F],
keySerializer: KeySerializer[F, K],
valueSerializer: ValueSerializer[F, V],
records: ProducerRecords[K, V]
records: ProducerRecords[K, V],
failFastProduce: Boolean
): F[F[ProducerResult[K, V]]] =
withProducer { (producer, blocking) =>
records
.traverse(produceRecord(keySerializer, valueSerializer, producer, blocking))
.map(_.sequence)
def produceRecords(produceRecordError: Option[Promise[Throwable]]) =
records
.traverse(
produceRecord(keySerializer, valueSerializer, producer, blocking, produceRecordError)
)
.map(_.sequence)

if (failFastProduce)
Async[F]
.delay(Promise[Throwable]())
.flatMap { produceRecordError =>
Async[F]
.race(
Async[F]
.fromFutureCancelable(Async[F].delay(produceRecordError.future, Async[F].unit)),
produceRecords(produceRecordError.some)
)
.rethrow
}
else produceRecords(None)
}

private[kafka] def produceRecord[F[_], K, V](
keySerializer: KeySerializer[F, K],
valueSerializer: ValueSerializer[F, V],
producer: KafkaByteProducer,
blocking: Blocking[F]
blocking: Blocking[F],
produceRecordError: Option[Promise[Throwable]]
)(implicit
F: Async[F]
): ProducerRecord[K, V] => F[F[(ProducerRecord[K, V], RecordMetadata)]] =
Expand All @@ -196,9 +215,11 @@ object KafkaProducer {
producer.send(
javaRecord,
{ (metadata, exception) =>
if (exception == null)
promise.success((record, metadata))
else promise.failure(exception)
if (exception == null) { promise.success((record, metadata)) }
else {
promise.failure(exception)
produceRecordError.foreach(_.failure(exception))
}
}
)
}.map(javaFuture =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ object KafkaProducerConnection {
withProducer,
keySerializer,
valueSerializer,
records
records,
settings.failFastProduce
)

override def metrics: G[Map[MetricName, Metric]] =
Expand Down
26 changes: 24 additions & 2 deletions modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,23 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings[F, K, V]

/**
* Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any
* [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error.
*
* When set to `true`, the `produce` method will fail fast, returning an error as soon as any
* record in the [[ProducerRecords]] fails to be sent.
*
* The default value is `false`, meaning the `produce` method will not fail fast and will
* continue processing other records even if some callbacks fail.
*/
def failFastProduce: Boolean

/**
* Creates a new [[ProducerSettings]] with the specified [[failFastProduce]].
*/
def withFailFastProduce(failFastProduce: Boolean): ProducerSettings[F, K, V]

}

object ProducerSettings {
Expand All @@ -224,7 +241,8 @@ object ProducerSettings {
override val valueSerializer: Resource[F, ValueSerializer[F, V]],
override val customBlockingContext: Option[ExecutionContext],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration
override val closeTimeout: FiniteDuration,
override val failFastProduce: Boolean
) extends ProducerSettings[F, K, V] {

override def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] =
Expand Down Expand Up @@ -301,6 +319,9 @@ object ProducerSettings {
): ProducerSettings[F, K1, V1] =
copy(keySerializer = keySerializer, valueSerializer = valueSerializer)

override def withFailFastProduce(failFastProduce: Boolean): ProducerSettings[F, K, V] =
copy(failFastProduce = failFastProduce)

}

private[this] def create[F[_], K, V](
Expand All @@ -314,7 +335,8 @@ object ProducerSettings {
properties = Map(
ProducerConfig.RETRIES_CONFIG -> "0"
),
closeTimeout = 60.seconds
closeTimeout = 60.seconds,
failFastProduce = false
)

def apply[F[_], K, V](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package fs2.kafka

import scala.annotation.nowarn
import scala.concurrent.Promise

import cats.effect.{Async, Outcome, Resource}
import cats.effect.syntax.all.*
Expand Down Expand Up @@ -141,11 +142,35 @@ object TransactionalKafkaProducer {
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
withProducer.exclusiveAccess { (producer, blocking) =>
blocking(producer.beginTransaction()).bracketCase { _ =>
val produce = records
.traverse(
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.flatMap(_.sequence)
def produceRecords(produceRecordError: Option[Promise[Throwable]]) =
records
.traverse(
KafkaProducer.produceRecord(
keySerializer,
valueSerializer,
producer,
blocking,
produceRecordError
)
)
.flatMap(_.sequence)

val produce =
if (settings.producerSettings.failFastProduce)
Async[F]
.delay(Promise[Throwable]())
.flatMap { produceRecordError =>
Async[F]
.race(
Async[F].fromFutureCancelable(
Async[F].delay(produceRecordError.future, Async[F].unit)
),
produceRecords(produceRecordError.some)
)
.rethrow
}
else
produceRecords(None)

sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking)))
} {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
* the fs2-kafka `KafkaProducer`. This is needed in order to instantiate
* [[fs2.kafka.KafkaProducer]] and [[fs2.kafka.TransactionalKafkaProducer]].<br><br>
*
* By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However this
* behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit
* By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However,
aartigao marked this conversation as resolved.
Show resolved Hide resolved
* this behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit
* instance in lexical scope.
*/
trait MkProducer[F[_]] {
Expand Down
25 changes: 25 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import cats.effect.IO
import cats.syntax.all.*
import fs2.{Chunk, Stream}

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.errors.TimeoutException

final class KafkaProducerSpec extends BaseKafkaSpec {

describe("creating producers") {
Expand Down Expand Up @@ -259,6 +262,28 @@ final class KafkaProducerSpec extends BaseKafkaSpec {
}
}

it("should fail fast to produce records with multiple") {
withTopic { topic =>
val nonExistentTopic = s"non-existent-$topic"
val toProduce = (0 until 1000).map(n => s"key-$n" -> s"value->$n").toList
val settings = producerSettings[IO]
.withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "500")
.withFailFastProduce(true)

val error = intercept[TimeoutException] {
(for {
producer <- KafkaProducer.stream(settings)
records = ProducerRecords(toProduce.map { case (key, value) =>
ProducerRecord(nonExistentTopic, key, value)
})
result <- Stream.eval(producer.produce(records).flatten)
} yield result).compile.lastOrError.unsafeRunSync()
}

error.getMessage shouldBe s"Topic $nonExistentTopic not present in metadata after 500 ms."
}
}

it("should get metrics") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,16 @@ final class ProducerSettingsSpec extends BaseSpec {
)
}
}

it("should provide failFastProduce default value") {
assert(settings.failFastProduce == false)
}

it("should be able to set failFastProduce") {
assert(settings.withFailFastProduce(true).failFastProduce == true)
}
}

val settings = ProducerSettings[IO, String, String]
private val settings = ProducerSettings[IO, String, String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

package fs2.kafka

import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.{CompletableFuture, Future}
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.duration.*
Expand All @@ -19,6 +21,8 @@ import fs2.kafka.internal.converters.collection.*
import fs2.kafka.producer.MkProducer

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata}
import org.apache.kafka.clients.producer
import org.apache.kafka.clients.producer.{Callback, ProducerConfig, RecordMetadata}
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -401,6 +405,91 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}

it("should fail fast to produce records with multiple") {
val (key0, value0) = "key-0" -> "value-0"
val (key1, value1) = "key-1" -> "value-1"
val (key2, value2) = "key-2" -> "value-2"
var transactionAborted = false
val expectedErrorOnSecondRecord = new RuntimeException("~Failed to produce second record~")

withTopic { topic =>
createCustomTopic(topic)

implicit val mk: MkProducer[IO] = new MkProducer[IO] {

def apply[G[_]](settings: ProducerSettings[G, ?, ?]): IO[KafkaByteProducer] =
IO.delay {
new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]](
(settings.properties: Map[String, AnyRef]).asJava,
new ByteArraySerializer,
new ByteArraySerializer
) {
override def send(
record: producer.ProducerRecord[Array[Byte], Array[Byte]],
callback: Callback
): Future[RecordMetadata] = {
val key = new String(record.key(), StandardCharsets.UTF_8)
val futureResult = CompletableFuture
.completedFuture(new RecordMetadata(new TopicPartition(topic, 0), 0, 0, 0, 0, 0))

key match {
case `key0` => futureResult

case `key1` =>
callback.onCompletion(null, expectedErrorOnSecondRecord)
Thread.sleep(500) // ensure the callback completes and the fail-fast mechanism is triggered
futureResult.completeExceptionally(expectedErrorOnSecondRecord)
futureResult

case key =>
fail(s"Unexpected key: $key, the producer should not produce any record after key $key1.")
}
}

override def abortTransaction(): Unit = {
transactionAborted = true
super.abortTransaction()
}
}
}

}

val producerRecords = List(
ProducerRecord(topic, key0, value0),
ProducerRecord(topic, key1, value1),
ProducerRecord(topic, key2, value2)
)
val committableOffset = CommittableOffset[IO](
new TopicPartition("topic-consumer", 0),
new OffsetAndMetadata(0),
Some("consumer-group"),
_ => IO.raiseError(new RuntimeException("Commit should not be called")).void
)
val committable = CommittableProducerRecords(producerRecords, committableOffset)

val settings = TransactionalProducerSettings(
transactionalId = s"fail-fast-$topic",
producerSettings = producerSettings[IO]
.withRetries(Int.MaxValue)
.withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
.withFailFastProduce(true)
)

val result = intercept[RuntimeException] {
TransactionalKafkaProducer
.stream(settings)
.evalMap(_.produce(Chunk.singleton(committable)))
.compile
.lastOrError
.unsafeRunSync()
}

result shouldBe expectedErrorOnSecondRecord
assert(transactionAborted, "The transaction should be aborted")
}
}

it("should get metrics") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
Expand Down