Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/series/2.x' into series/3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Mar 9, 2022
2 parents 90dc28f + 585d714 commit 0dd576b
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 133 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ val kafkaVersion = "3.0.0"

val testcontainersScalaVersion = "0.40.0"

val vulcanVersion = "1.7.1"
val vulcanVersion = "1.8.0"

val munitVersion = "0.7.29"

Expand Down
5 changes: 5 additions & 0 deletions docs/src/main/mdoc/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ Kafka transactions are supported through a [`TransactionalKafkaProducer`][transa

- Create `CommittableProducerRecords` and wrap them in `TransactionalProducerRecords`.

> Note that calls to `produce` are sequenced in the `TransactionalKafkaProducer` to ensure that, when used concurrently, transactions don't run into each other resulting in an invalid transaction transition exception.
>
> Because the `TransactionalKafkaProducer` waits for the record batch to be flushed and the transaction committed on the broker, this could lead to performance bottlenecks where a single producer is shared among many threads.
> To ensure the performance of `TransactionalKafkaProducer` aligns with your performance expectations when used concurrently, it is recommended you create a pool of transactional producers.
Following is an example where transactions are used to consume, process, produce, and commit.

```scala mdoc
Expand Down
11 changes: 10 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/RecordDeserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

package fs2.kafka

import cats.Applicative
import cats.syntax.all._
import cats.{Applicative, Functor}

/**
* Deserializer which may vary depending on whether a record
Expand All @@ -17,6 +18,14 @@ sealed abstract class RecordDeserializer[F[_], A] {
def forKey: F[Deserializer[F, A]]

def forValue: F[Deserializer[F, A]]

/**
* Returns a new [[RecordDeserializer]] instance that will catch deserialization
* errors and return them as a value, allowing user code to handle them without
* causing the consumer to fail.
*/
final def attempt(implicit F: Functor[F]): RecordDeserializer[F, Either[Throwable, A]] =
RecordDeserializer.instance(forKey.map(_.attempt), forValue.map(_.attempt))
}

object RecordDeserializer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ object TransactionalKafkaProducer {
def metrics: F[Map[MetricName, Metric]]
}

/**
* [[TransactionalKafkaProducer.WithoutOffsets]] extends [[TransactionalKafkaProducer.Metrics]]
* to allow producing of records without corresponding upstream offsets.
*/
abstract class WithoutOffsets[F[_], K, V] extends Metrics[F, K, V] {

/**
* Produces the `ProducerRecord`s in the specified [[ProducerRecords]]
* in three steps: first a transaction is initialized, then the records are placed
* in the buffer of the producer, and lastly the transaction is committed. If errors
* or cancellation occurs, the transaction is aborted. The returned effect succeeds
* if the whole transaction completes successfully.
*/
def produceWithoutOffsets[P](records: ProducerRecords[P, K, V]): F[ProducerResult[P, K, V]]
}

/**
* Creates a new [[TransactionalKafkaProducer]] in the `Resource` context,
* using the specified [[TransactionalProducerSettings]]. Note that there
Expand All @@ -73,20 +89,20 @@ object TransactionalKafkaProducer {
)(
implicit F: Async[F],
mk: MkProducer[F]
): Resource[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Resource[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
(
Resource.eval(settings.producerSettings.keySerializer),
Resource.eval(settings.producerSettings.valueSerializer),
WithProducer(mk, settings)
WithTransactionalProducer(mk, settings)
).mapN { (keySerializer, valueSerializer, withProducer) =>
new TransactionalKafkaProducer.Metrics[F, K, V] {
new TransactionalKafkaProducer.WithoutOffsets[F, K, V] {
override def produce[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[ProducerResult[P, K, V]] =
produceTransaction(records)
produceTransactionWithOffsets(records)
.map(ProducerResult(_, records.passthrough))

private[this] def produceTransaction[P](
private[this] def produceTransactionWithOffsets[P](
records: TransactionalProducerRecords[F, P, K, V]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.records.isEmpty) F.pure(Chunk.empty)
Expand All @@ -100,34 +116,50 @@ object TransactionalKafkaProducer {
else F.pure(batch.consumerGroupIds.head)

consumerGroupId.flatMap { groupId =>
withProducer { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
records.records
.flatMap(_.records)
.traverse(
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)
.flatTap { _ =>
blocking {
producer.sendOffsetsToTransaction(
batch.offsets.asJava,
new ConsumerGroupMetadata(groupId)
)
}
}
} {
case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())
}
}.flatten
val sendOffsets: (KafkaByteProducer, Blocking[F]) => F[Unit] = (producer, blocking) =>
blocking {
producer.sendOffsetsToTransaction(
batch.offsets.asJava,
new ConsumerGroupMetadata(groupId)
)
}

produceTransaction(records.records.flatMap(_.records), Some(sendOffsets))
}
}

override def produceWithoutOffsets[P](
records: ProducerRecords[P, K, V]
): F[ProducerResult[P, K, V]] =
produceTransaction(records.records, None).map(ProducerResult(_, records.passthrough))

private[this] def produceTransaction[P](
records: Chunk[ProducerRecord[K, V]],
sendOffsets: Option[(KafkaByteProducer, Blocking[F]) => F[Unit]]
): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] =
if (records.isEmpty) F.pure(Chunk.empty)
else {

withProducer.exclusiveAccess { (producer, blocking) =>
blocking(producer.beginTransaction())
.bracketCase { _ =>
val produce = records
.traverse(
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)

sendOffsets.fold(produce)(f => produce.flatTap(_ => f(producer, blocking)))
} {
case (_, Outcome.Succeeded(_)) =>
blocking(producer.commitTransaction())
case (_, Outcome.Canceled() | Outcome.Errored(_)) =>
blocking(producer.abortTransaction())
}
}.flatten
}

override def metrics: F[Map[MetricName, Metric]] =
withProducer.blocking { _.metrics().asScala.toMap }

Expand All @@ -151,7 +183,7 @@ object TransactionalKafkaProducer {
)(
implicit F: Async[F],
mk: MkProducer[F]
): Stream[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Stream[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
Stream.resource(resource(settings))

def apply[F[_]]: TransactionalProducerPartiallyApplied[F] =
Expand All @@ -173,7 +205,7 @@ object TransactionalKafkaProducer {
def resource[K, V](settings: TransactionalProducerSettings[F, K, V])(
implicit F: Async[F],
mk: MkProducer[F]
): Resource[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Resource[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
TransactionalKafkaProducer.resource(settings)

/**
Expand All @@ -189,7 +221,7 @@ object TransactionalKafkaProducer {
def stream[K, V](settings: TransactionalProducerSettings[F, K, V])(
implicit F: Async[F],
mk: MkProducer[F]
): Stream[F, TransactionalKafkaProducer.Metrics[F, K, V]] =
): Stream[F, TransactionalKafkaProducer.WithoutOffsets[F, K, V]] =
TransactionalKafkaProducer.stream(settings)

override def toString: String =
Expand Down
28 changes: 2 additions & 26 deletions modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@

package fs2.kafka.internal

import fs2.kafka.producer.MkProducer
import cats.effect.{Async, Resource}
import cats.syntax.all._
import fs2.kafka.{KafkaByteProducer, ProducerSettings, TransactionalProducerSettings}
import fs2.kafka.{KafkaByteProducer, ProducerSettings}
import scala.jdk.DurationConverters._
import fs2.kafka.producer.MkProducer

private[kafka] sealed abstract class WithProducer[F[_]] {
def apply[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A]
Expand Down Expand Up @@ -40,29 +39,6 @@ private[kafka] object WithProducer {
.map(create(_, blockingG))
}

def apply[F[_], K, V](
mk: MkProducer[F],
settings: TransactionalProducerSettings[F, K, V]
)(
implicit F: Async[F]
): Resource[F, WithProducer[F]] =
Resource[F, WithProducer[F]] {
mk(settings.producerSettings).flatMap { producer =>
val blocking = settings.producerSettings.customBlockingContext
.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext)

val withProducer = create(producer, blocking)

val initTransactions = withProducer.blocking { _.initTransactions() }

val close = withProducer.blocking {
_.close(settings.producerSettings.closeTimeout.toJava)
}

initTransactions.as((withProducer, close))
}
}

private def create[F[_]](
producer: KafkaByteProducer,
_blocking: Blocking[F]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka.internal

import cats.effect.std.Semaphore
import cats.effect.{Async, MonadCancelThrow, Resource}
import cats.implicits._
import scala.jdk.DurationConverters._
import fs2.kafka.producer.MkProducer
import fs2.kafka.{KafkaByteProducer, TransactionalProducerSettings}

private[kafka] sealed abstract class WithTransactionalProducer[F[_]] {
def apply[A](f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A]): F[A]

def exclusiveAccess[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A] = apply {
case (producer, blocking, exclusive) => exclusive(f(producer, blocking))
}

def blocking[A](f: KafkaByteProducer => A): F[A] = apply {
case (producer, blocking, _) => blocking(f(producer))
}
}

private[kafka] object WithTransactionalProducer {
def apply[F[_], K, V](
mk: MkProducer[F],
settings: TransactionalProducerSettings[F, K, V]
)(
implicit F: Async[F]
): Resource[F, WithTransactionalProducer[F]] =
Resource[F, WithTransactionalProducer[F]] {
(mk(settings.producerSettings), Semaphore(1)).tupled.flatMap {
case (producer, semaphore) =>
val blocking = settings.producerSettings.customBlockingContext
.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext)

val withProducer = create(producer, blocking, semaphore)

val initTransactions = withProducer.blocking { _.initTransactions() }

/*
Deliberately does not use the exclusive access functionality to close the producer. The close method on
the underlying client waits until the buffer has been flushed to the broker or the timeout is exceeded.
Because the transactional producer _always_ waits until the buffer is flushed and the transaction
committed on the broker before proceeding, upon gaining exclusive access to the producer the buffer will
always be empty. Therefore if we used exclusive access to close the underlying producer, the buffer
would already be empty and the close timeout setting would be redundant.
TLDR: not using exclusive access here preserves the behaviour of the underlying close method and timeout
setting
*/
val close = withProducer.blocking {
_.close(settings.producerSettings.closeTimeout.toJava)
}

initTransactions.as((withProducer, close))
}
}

private def create[F[_]: MonadCancelThrow](
producer: KafkaByteProducer,
_blocking: Blocking[F],
transactionSemaphore: Semaphore[F]
): WithTransactionalProducer[F] = new WithTransactionalProducer[F] {
override def apply[A](
f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A]
): F[A] =
f(producer, _blocking, transactionSemaphore.permit.surround)
}
}
12 changes: 12 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/internal/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package fs2.kafka

package object internal {
private[kafka] type ExclusiveAccess[F[_], A] = F[A] => F[A]

}
15 changes: 11 additions & 4 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import cats.effect.unsafe.implicits.global
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException}
import org.apache.kafka.clients.consumer.{
ConsumerConfig,
CooperativeStickyAssignor,
NoOffsetForPartitionException
}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.scalatest.Assertion
Expand Down Expand Up @@ -559,7 +563,9 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(
consumerSettings[IO]
.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[
CooperativeStickyAssignor
].getName
)
)
.subscribeTo(topic)
Expand Down Expand Up @@ -801,9 +807,10 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(
consumerSettings[IO]
.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[
CooperativeStickyAssignor
].getName
)

)
.subscribeTo(topic)
.evalMap { consumer =>
Expand Down
Loading

0 comments on commit 0dd576b

Please sign in to comment.