diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
index 8cd5770d7..2d38059e0 100644
--- a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
+++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
@@ -172,19 +172,38 @@ object KafkaProducer {
withProducer: WithProducer[F],
keySerializer: KeySerializer[F, K],
valueSerializer: ValueSerializer[F, V],
- records: ProducerRecords[K, V]
+ records: ProducerRecords[K, V],
+ failFastProduce: Boolean
): F[F[ProducerResult[K, V]]] =
withProducer { (producer, blocking) =>
- records
- .traverse(produceRecord(keySerializer, valueSerializer, producer, blocking))
- .map(_.sequence)
+ def produceRecords(produceRecordError: Option[Promise[Throwable]]) =
+ records
+ .traverse(
+ produceRecord(keySerializer, valueSerializer, producer, blocking, produceRecordError)
+ )
+ .map(_.sequence)
+
+ if (failFastProduce)
+ Async[F]
+ .delay(Promise[Throwable]())
+ .flatMap { produceRecordError =>
+ Async[F]
+ .race(
+ Async[F]
+ .fromFutureCancelable(Async[F].delay(produceRecordError.future, Async[F].unit)),
+ produceRecords(produceRecordError.some)
+ )
+ .rethrow
+ }
+ else produceRecords(None)
}
private[kafka] def produceRecord[F[_], K, V](
keySerializer: KeySerializer[F, K],
valueSerializer: ValueSerializer[F, V],
producer: KafkaByteProducer,
- blocking: Blocking[F]
+ blocking: Blocking[F],
+ produceRecordError: Option[Promise[Throwable]]
)(implicit
F: Async[F]
): ProducerRecord[K, V] => F[F[(ProducerRecord[K, V], RecordMetadata)]] =
@@ -196,9 +215,11 @@ object KafkaProducer {
producer.send(
javaRecord,
{ (metadata, exception) =>
- if (exception == null)
- promise.success((record, metadata))
- else promise.failure(exception)
+ if (exception == null) { promise.success((record, metadata)) }
+ else {
+ promise.failure(exception)
+ produceRecordError.foreach(_.failure(exception))
+ }
}
)
}.map(javaFuture =>
diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala
index 5e60eac49..666cf0e6a 100644
--- a/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala
+++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducerConnection.scala
@@ -131,7 +131,8 @@ object KafkaProducerConnection {
withProducer,
keySerializer,
valueSerializer,
- records
+ records,
+ settings.failFastProduce
)
override def metrics: G[Map[MetricName, Metric]] =
diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
index 8c841c044..e36c1cea7 100644
--- a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
+++ b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
@@ -215,6 +215,23 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings[F, K, V]
+ /**
+ * Controls whether [[fs2.kafka.KafkaProducer.produce]] fails immediately if any
+ * [[org.apache.kafka.clients.producer.KafkaProducer.send]] callback resolves with error.
+ *
+ * When set to `true`, the `produce` method will fail fast, returning an error as soon as any
+ * record in the [[ProducerRecords]] fails to be sent.
+ *
+ * The default value is `false`, meaning the `produce` method will not fail fast and will
+ * continue processing other records even if some callbacks fail.
+ */
+ def failFastProduce: Boolean
+
+ /**
+ * Creates a new [[ProducerSettings]] with the specified [[failFastProduce]].
+ */
+ def withFailFastProduce(failFastProduce: Boolean): ProducerSettings[F, K, V]
+
}
object ProducerSettings {
@@ -224,7 +241,8 @@ object ProducerSettings {
override val valueSerializer: Resource[F, ValueSerializer[F, V]],
override val customBlockingContext: Option[ExecutionContext],
override val properties: Map[String, String],
- override val closeTimeout: FiniteDuration
+ override val closeTimeout: FiniteDuration,
+ override val failFastProduce: Boolean
) extends ProducerSettings[F, K, V] {
override def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] =
@@ -301,6 +319,9 @@ object ProducerSettings {
): ProducerSettings[F, K1, V1] =
copy(keySerializer = keySerializer, valueSerializer = valueSerializer)
+ override def withFailFastProduce(failFastProduce: Boolean): ProducerSettings[F, K, V] =
+ copy(failFastProduce = failFastProduce)
+
}
private[this] def create[F[_], K, V](
@@ -314,7 +335,8 @@ object ProducerSettings {
properties = Map(
ProducerConfig.RETRIES_CONFIG -> "0"
),
- closeTimeout = 60.seconds
+ closeTimeout = 60.seconds,
+ failFastProduce = false
)
def apply[F[_], K, V](
diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala
index a3a920563..6c23a15ae 100644
--- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala
+++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala
@@ -7,6 +7,7 @@
package fs2.kafka
import scala.annotation.nowarn
+import scala.concurrent.Promise
import cats.effect.{Async, Outcome, Resource}
import cats.effect.syntax.all.*
@@ -141,11 +142,35 @@ object TransactionalKafkaProducer {
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
withProducer.exclusiveAccess { (producer, blocking) =>
blocking(producer.beginTransaction()).bracketCase { _ =>
- val produce = records
- .traverse(
- KafkaProducer.produceRecord(keySerializer, valueSerializer, producer, blocking)
- )
- .flatMap(_.sequence)
+ def produceRecords(produceRecordError: Option[Promise[Throwable]]) =
+ records
+ .traverse(
+ KafkaProducer.produceRecord(
+ keySerializer,
+ valueSerializer,
+ producer,
+ blocking,
+ produceRecordError
+ )
+ )
+ .flatMap(_.sequence)
+
+ val produce =
+ if (settings.producerSettings.failFastProduce)
+ Async[F]
+ .delay(Promise[Throwable]())
+ .flatMap { produceRecordError =>
+ Async[F]
+ .race(
+ Async[F].fromFutureCancelable(
+ Async[F].delay(produceRecordError.future, Async[F].unit)
+ ),
+ produceRecords(produceRecordError.some)
+ )
+ .rethrow
+ }
+ else
+ produceRecords(None)
sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking)))
} {
diff --git a/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala b/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala
index 8db7d2ab4..d6e883786 100644
--- a/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala
+++ b/modules/core/src/main/scala/fs2/kafka/producer/MkProducer.scala
@@ -17,8 +17,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
* the fs2-kafka `KafkaProducer`. This is needed in order to instantiate
* [[fs2.kafka.KafkaProducer]] and [[fs2.kafka.TransactionalKafkaProducer]].
*
- * By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However this
- * behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit
+ * By default, the instance provided by [[MkProducer.mkProducerForSync]] will be used. However,
+ * this behaviour can be overridden, e.g. for testing purposes, by placing an alternative implicit
* instance in lexical scope.
*/
trait MkProducer[F[_]] {
diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala
index 9f80fd76c..484ce3a9c 100644
--- a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala
+++ b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala
@@ -11,6 +11,9 @@ import cats.effect.IO
import cats.syntax.all.*
import fs2.{Chunk, Stream}
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.errors.TimeoutException
+
final class KafkaProducerSpec extends BaseKafkaSpec {
describe("creating producers") {
@@ -259,6 +262,28 @@ final class KafkaProducerSpec extends BaseKafkaSpec {
}
}
+ it("should fail fast to produce records with multiple") {
+ withTopic { topic =>
+ val nonExistentTopic = s"non-existent-$topic"
+ val toProduce = (0 until 1000).map(n => s"key-$n" -> s"value->$n").toList
+ val settings = producerSettings[IO]
+ .withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "500")
+ .withFailFastProduce(true)
+
+ val error = intercept[TimeoutException] {
+ (for {
+ producer <- KafkaProducer.stream(settings)
+ records = ProducerRecords(toProduce.map { case (key, value) =>
+ ProducerRecord(nonExistentTopic, key, value)
+ })
+ result <- Stream.eval(producer.produce(records).flatten)
+ } yield result).compile.lastOrError.unsafeRunSync()
+ }
+
+ error.getMessage shouldBe s"Topic $nonExistentTopic not present in metadata after 500 ms."
+ }
+ }
+
it("should get metrics") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
diff --git a/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala b/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala
index 46664e007..b9793ba9e 100644
--- a/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala
+++ b/modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala
@@ -168,8 +168,16 @@ final class ProducerSettingsSpec extends BaseSpec {
)
}
}
+
+ it("should provide failFastProduce default value") {
+ assert(settings.failFastProduce == false)
+ }
+
+ it("should be able to set failFastProduce") {
+ assert(settings.withFailFastProduce(true).failFastProduce == true)
+ }
}
- val settings = ProducerSettings[IO, String, String]
+ private val settings = ProducerSettings[IO, String, String]
}
diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala
index 6ee29d802..fb87d28e9 100644
--- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala
+++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala
@@ -6,7 +6,9 @@
package fs2.kafka
+import java.nio.charset.StandardCharsets
import java.util
+import java.util.concurrent.{CompletableFuture, Future}
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration.*
@@ -19,6 +21,8 @@ import fs2.kafka.internal.converters.collection.*
import fs2.kafka.producer.MkProducer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata}
+import org.apache.kafka.clients.producer
+import org.apache.kafka.clients.producer.{Callback, ProducerConfig, RecordMetadata}
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.TopicPartition
@@ -401,6 +405,91 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
}
}
+ it("should fail fast to produce records with multiple") {
+ val (key0, value0) = "key-0" -> "value-0"
+ val (key1, value1) = "key-1" -> "value-1"
+ val (key2, value2) = "key-2" -> "value-2"
+ var transactionAborted = false
+ val expectedErrorOnSecondRecord = new RuntimeException("~Failed to produce second record~")
+
+ withTopic { topic =>
+ createCustomTopic(topic)
+
+ implicit val mk: MkProducer[IO] = new MkProducer[IO] {
+
+ def apply[G[_]](settings: ProducerSettings[G, ?, ?]): IO[KafkaByteProducer] =
+ IO.delay {
+ new org.apache.kafka.clients.producer.KafkaProducer[Array[Byte], Array[Byte]](
+ (settings.properties: Map[String, AnyRef]).asJava,
+ new ByteArraySerializer,
+ new ByteArraySerializer
+ ) {
+ override def send(
+ record: producer.ProducerRecord[Array[Byte], Array[Byte]],
+ callback: Callback
+ ): Future[RecordMetadata] = {
+ val key = new String(record.key(), StandardCharsets.UTF_8)
+ val futureResult = CompletableFuture
+ .completedFuture(new RecordMetadata(new TopicPartition(topic, 0), 0, 0, 0, 0, 0))
+
+ key match {
+ case `key0` => futureResult
+
+ case `key1` =>
+ callback.onCompletion(null, expectedErrorOnSecondRecord)
+ Thread.sleep(500) // ensure the callback completes and the fail-fast mechanism is triggered
+ futureResult.completeExceptionally(expectedErrorOnSecondRecord)
+ futureResult
+
+ case key =>
+ fail(s"Unexpected key: $key, the producer should not produce any record after key $key1.")
+ }
+ }
+
+ override def abortTransaction(): Unit = {
+ transactionAborted = true
+ super.abortTransaction()
+ }
+ }
+ }
+
+ }
+
+ val producerRecords = List(
+ ProducerRecord(topic, key0, value0),
+ ProducerRecord(topic, key1, value1),
+ ProducerRecord(topic, key2, value2)
+ )
+ val committableOffset = CommittableOffset[IO](
+ new TopicPartition("topic-consumer", 0),
+ new OffsetAndMetadata(0),
+ Some("consumer-group"),
+ _ => IO.raiseError(new RuntimeException("Commit should not be called")).void
+ )
+ val committable = CommittableProducerRecords(producerRecords, committableOffset)
+
+ val settings = TransactionalProducerSettings(
+ transactionalId = s"fail-fast-$topic",
+ producerSettings = producerSettings[IO]
+ .withRetries(Int.MaxValue)
+ .withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
+ .withFailFastProduce(true)
+ )
+
+ val result = intercept[RuntimeException] {
+ TransactionalKafkaProducer
+ .stream(settings)
+ .evalMap(_.produce(Chunk.singleton(committable)))
+ .compile
+ .lastOrError
+ .unsafeRunSync()
+ }
+
+ result shouldBe expectedErrorOnSecondRecord
+ assert(transactionAborted, "The transaction should be aborted")
+ }
+ }
+
it("should get metrics") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)