diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala index 8cd5770d7..2d38059e0 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala @@ -172,19 +172,38 @@ object KafkaProducer { withProducer: WithProducer[F], keySerializer: KeySerializer[F, K], valueSerializer: ValueSerializer[F, V], - records: ProducerRecords[K, V] + records: ProducerRecords[K, V], + failFastProduce: Boolean ): F[F[ProducerResult[K, V]]] = withProducer { (producer, blocking) => - records - .traverse(produceRecord(keySerializer, valueSerializer, producer, blocking)) - .map(_.sequence) + def produceRecords(produceRecordError: Option[Promise[Throwable]]) = + records + .traverse( + produceRecord(keySerializer, valueSerializer, producer, blocking, produceRecordError) + ) + .map(_.sequence) + + if (failFastProduce) + Async[F] + .delay(Promise[Throwable]()) + .flatMap { produceRecordError => + Async[F] + .race( + Async[F] + .fromFutureCancelable(Async[F].delay(produceRecordError.future, Async[F].unit)), + produceRecords(produceRecordError.some) + ) + .rethrow + } + else produceRecords(None) } private[kafka] def produceRecord[F[_], K, V]( keySerializer: KeySerializer[F, K], valueSerializer: ValueSerializer[F, V], producer: KafkaByteProducer, - blocking: Blocking[F] + blocking: Blocking[F], + produceRecordError: Option[Promise[Throwable]] )(implicit F: Async[F] ): ProducerRecord[K, V] => F[F[(ProducerRecord[K, V], RecordMetadata)]] = @@ -196,9 +215,11 @@ object KafkaProducer { producer.send( javaRecord, { (metadata, exception) => - if (exception == null) - promise.success((record, metadata)) - else promise.failure(exception) + if (exception == null) { promise.success((record, metadata)) } + else { + promise.failure(exception) + produceRecordError.foreach(_.failure(exception)) + } } ) }.map(javaFuture => diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala index 5e60eac49..666cf0e6a 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala @@ -131,7 +131,8 @@ object KafkaProducerConnection { withProducer, keySerializer, valueSerializer, - records + records, + settings.failFastProduce ) override def metrics: G[Map[MetricName, Metric]] = diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala index 8c841c044..e36c1cea7 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala @@ -215,6 +215,23 @@ sealed abstract class ProducerSettings[F[_], K, V] { */ def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings[F, K, V] + /** + * Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any + * [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error. + * + * When set to `true`, the `produce` method will fail fast, returning an error as soon as any + * record in the [[ProducerRecords]] fails to be sent. + * + * The default value is `false`, meaning the `produce` method will not fail fast and will + * continue processing other records even if some callbacks fail. + */ + def failFastProduce: Boolean + + /** + * Creates a new [[ProducerSettings]] with the specified [[failFastProduce]]. + */ + def withFailFastProduce(failFastProduce: Boolean): ProducerSettings[F, K, V] + } object ProducerSettings { @@ -224,7 +241,8 @@ object ProducerSettings { override val valueSerializer: Resource[F, ValueSerializer[F, V]], override val customBlockingContext: Option[ExecutionContext], override val properties: Map[String, String], - override val closeTimeout: FiniteDuration + override val closeTimeout: FiniteDuration, + override val failFastProduce: Boolean ) extends ProducerSettings[F, K, V] { override def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] = @@ -301,6 +319,9 @@ object ProducerSettings { ): ProducerSettings[F, K1, V1] = copy(keySerializer = keySerializer, valueSerializer = valueSerializer) + override def withFailFastProduce(failFastProduce: Boolean): ProducerSettings[F, K, V] = + copy(failFastProduce = failFastProduce) + } private[this] def create[F[_], K, V]( @@ -314,7 +335,8 @@ object ProducerSettings { properties = Map( ProducerConfig.RETRIES_CONFIG -> "0" ), - closeTimeout = 60.seconds + closeTimeout = 60.seconds, + failFastProduce = false ) def apply[F[_], K, V]( diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala index a3a920563..6c23a15ae 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala @@ -7,6 +7,7 @@ package fs2.kafka import scala.annotation.nowarn +import scala.concurrent.Promise import cats.effect.{Async, Outcome, Resource} import cats.effect.syntax.all.* @@ -141,11 +142,35 @@ object TransactionalKafkaProducer { ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = withProducer.exclusiveAccess { (producer, blocking) => blocking(producer.beginTransaction()).bracketCase { _ => - val produce = records - .traverse( - KafkaProducer.produceRecord(keySerializer, valueSerializer, producer, blocking) - ) - .flatMap(_.sequence) + def produceRecords(produceRecordError: Option[Promise[Throwable]]) = + records + .traverse( + KafkaProducer.produceRecord( + keySerializer, + valueSerializer, + producer, + blocking, + produceRecordError + ) + ) + .flatMap(_.sequence) + + val produce = + if (settings.producerSettings.failFastProduce) + Async[F] + .delay(Promise[Throwable]()) + .flatMap { produceRecordError => + Async[F] + .race( + Async[F].fromFutureCancelable( + Async[F].delay(produceRecordError.future, Async[F].unit) + ), + produceRecords(produceRecordError.some) + ) + .rethrow + } + else + produceRecords(None) sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking))) } { diff --git a/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala b/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala index 8db7d2ab4..d6e883786 100644 --- a/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala @@ -17,8 +17,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer * the fs2-kafka `KafkaProducer`. This is needed in order to instantiate * [[fs2.kafka.KafkaProducer]] and [[fs2.kafka.TransactionalKafkaProducer]].

* - * By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However this - * behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit + * By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However, + * this behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit * instance in lexical scope. */ trait MkProducer[F[_]] { diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala index 9f80fd76c..484ce3a9c 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala @@ -11,6 +11,9 @@ import cats.effect.IO import cats.syntax.all.* import fs2.{Chunk, Stream} +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.errors.TimeoutException + final class KafkaProducerSpec extends BaseKafkaSpec { describe("creating producers") { @@ -259,6 +262,28 @@ final class KafkaProducerSpec extends BaseKafkaSpec { } } + it("should fail fast to produce records with multiple") { + withTopic { topic => + val nonExistentTopic = s"non-existent-$topic" + val toProduce = (0 until 1000).map(n => s"key-$n" -> s"value->$n").toList + val settings = producerSettings[IO] + .withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "500") + .withFailFastProduce(true) + + val error = intercept[TimeoutException] { + (for { + producer <- KafkaProducer.stream(settings) + records = ProducerRecords(toProduce.map { case (key, value) => + ProducerRecord(nonExistentTopic, key, value) + }) + result <- Stream.eval(producer.produce(records).flatten) + } yield result).compile.lastOrError.unsafeRunSync() + } + + error.getMessage shouldBe s"Topic $nonExistentTopic not present in metadata after 500 ms." + } + } + it("should get metrics") { withTopic { topic => createCustomTopic(topic, partitions = 3) diff --git a/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala b/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala index 46664e007..b9793ba9e 100644 --- a/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala @@ -168,8 +168,16 @@ final class ProducerSettingsSpec extends BaseSpec { ) } } + + it("should provide failFastProduce default value") { + assert(settings.failFastProduce == false) + } + + it("should be able to set failFastProduce") { + assert(settings.withFailFastProduce(true).failFastProduce == true) + } } - val settings = ProducerSettings[IO, String, String] + private val settings = ProducerSettings[IO, String, String] } diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala index 6ee29d802..fb87d28e9 100644 --- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala @@ -6,7 +6,9 @@ package fs2.kafka +import java.nio.charset.StandardCharsets import java.util +import java.util.concurrent.{CompletableFuture, Future} import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration.* @@ -19,6 +21,8 @@ import fs2.kafka.internal.converters.collection.* import fs2.kafka.producer.MkProducer import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata} +import org.apache.kafka.clients.producer +import org.apache.kafka.clients.producer.{Callback, ProducerConfig, RecordMetadata} import org.apache.kafka.common.errors.ProducerFencedException import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.TopicPartition @@ -401,6 +405,91 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { } } + it("should fail fast to produce records with multiple") { + val (key0, value0) = "key-0" -> "value-0" + val (key1, value1) = "key-1" -> "value-1" + val (key2, value2) = "key-2" -> "value-2" + var transactionAborted = false + val expectedErrorOnSecondRecord = new RuntimeException("~Failed to produce second record~") + + withTopic { topic => + createCustomTopic(topic) + + implicit val mk: MkProducer[IO] = new MkProducer[IO] { + + def apply[G[_]](settings: ProducerSettings[G, ?, ?]): IO[KafkaByteProducer] = + IO.delay { + new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]]( + (settings.properties: Map[String, AnyRef]).asJava, + new ByteArraySerializer, + new ByteArraySerializer + ) { + override def send( + record: producer.ProducerRecord[Array[Byte], Array[Byte]], + callback: Callback + ): Future[RecordMetadata] = { + val key = new String(record.key(), StandardCharsets.UTF_8) + val futureResult = CompletableFuture + .completedFuture(new RecordMetadata(new TopicPartition(topic, 0), 0, 0, 0, 0, 0)) + + key match { + case `key0` => futureResult + + case `key1` => + callback.onCompletion(null, expectedErrorOnSecondRecord) + Thread.sleep(500) // ensure the callback completes and the fail-fast mechanism is triggered + futureResult.completeExceptionally(expectedErrorOnSecondRecord) + futureResult + + case key => + fail(s"Unexpected key: $key, the producer should not produce any record after key $key1.") + } + } + + override def abortTransaction(): Unit = { + transactionAborted = true + super.abortTransaction() + } + } + } + + } + + val producerRecords = List( + ProducerRecord(topic, key0, value0), + ProducerRecord(topic, key1, value1), + ProducerRecord(topic, key2, value2) + ) + val committableOffset = CommittableOffset[IO]( + new TopicPartition("topic-consumer", 0), + new OffsetAndMetadata(0), + Some("consumer-group"), + _ => IO.raiseError(new RuntimeException("Commit should not be called")).void + ) + val committable = CommittableProducerRecords(producerRecords, committableOffset) + + val settings = TransactionalProducerSettings( + transactionalId = s"fail-fast-$topic", + producerSettings = producerSettings[IO] + .withRetries(Int.MaxValue) + .withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + .withFailFastProduce(true) + ) + + val result = intercept[RuntimeException] { + TransactionalKafkaProducer + .stream(settings) + .evalMap(_.produce(Chunk.singleton(committable))) + .compile + .lastOrError + .unsafeRunSync() + } + + result shouldBe expectedErrorOnSecondRecord + assert(transactionAborted, "The transaction should be aborted") + } + } + it("should get metrics") { withTopic { topic => createCustomTopic(topic, partitions = 3)