Skip to content

Commit

Permalink
Merge pull request #823 from zcox/sendff
Browse files Browse the repository at this point in the history
KAYAK-3355 Send with nested effects
  • Loading branch information
zcox authored Dec 28, 2023
2 parents 722cb99 + 27db618 commit cf40871
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 5 deletions.
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ lazy val core = project
mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.core.ProblemFilters._
Seq()
Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.banno.kafka.producer.ProducerApi.mapK"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"com.banno.kafka.producer.ProducerApi.send"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.banno.kafka.producer.ProducerImpl.mapK"
),
)
},
)
.settings(
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/com/banno/kafka/producer/ProducerApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ trait ProducerApi[F[_], K, V] {
def sendSync(record: ProducerRecord[K, V]): F[RecordMetadata]
def sendAsync(record: ProducerRecord[K, V]): F[RecordMetadata]

def send(record: ProducerRecord[K, V]): F[F[RecordMetadata]]

// Cats doesn't have `Bicontravariant`
final def contrabimap[A, B](f: A => K, g: B => V): ProducerApi[F, A, B] = {
val self = this
Expand Down Expand Up @@ -84,6 +86,8 @@ trait ProducerApi[F[_], K, V] {
self.sendSync(record.bimap(f, g))
override def sendAsync(record: ProducerRecord[A, B]): F[RecordMetadata] =
self.sendAsync(record.bimap(f, g))
override def send(record: ProducerRecord[A, B]): F[F[RecordMetadata]] =
self.send(record.bimap(f, g))
}
}

Expand Down Expand Up @@ -123,10 +127,12 @@ trait ProducerApi[F[_], K, V] {
record.bitraverse(f, g) >>= self.sendSync
override def sendAsync(record: ProducerRecord[A, B]): F[RecordMetadata] =
record.bitraverse(f, g) >>= self.sendAsync
override def send(record: ProducerRecord[A, B]): F[F[RecordMetadata]] =
record.bitraverse(f, g) >>= self.send
}
}

final def mapK[G[_]](f: F ~> G): ProducerApi[G, K, V] = {
final def mapK[G[_]: Functor](f: F ~> G): ProducerApi[G, K, V] = {
val self = this
new ProducerApi[G, K, V] {
override def abortTransaction: G[Unit] = f(self.abortTransaction)
Expand All @@ -153,6 +159,8 @@ trait ProducerApi[F[_], K, V] {
f(self.sendSync(record))
override def sendAsync(record: ProducerRecord[K, V]): G[RecordMetadata] =
f(self.sendAsync(record))
override def send(record: ProducerRecord[K, V]): G[G[RecordMetadata]] =
f(self.send(record)).map(f(_))
}
}
}
Expand Down
50 changes: 47 additions & 3 deletions core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.concurrent.duration._
import org.apache.kafka.common._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer._
import scala.concurrent.Promise
import scala.util.Try

case class ProducerImpl[F[_], K, V](p: Producer[K, V])(implicit F: Async[F])
extends ProducerApi[F, K, V] {
Expand Down Expand Up @@ -70,6 +72,21 @@ case class ProducerImpl[F[_], K, V](p: Producer[K, V])(implicit F: Async[F])
Some(F.delay(jFuture.cancel(false)).void)
}

/** The outer effect sends the record on a blocking context and is cancelable.
* The inner effect cancels the underlying send.
*/
private def sendRaw2(
record: ProducerRecord[K, V],
callback: Try[RecordMetadata] => Unit,
): F[F[Unit]] =
// KafkaProducer.send should be interruptible via InterruptedException, so use F.interruptible instead of F.blocking or F.delay
F.interruptible(
sendRaw(
record,
{ (rm, e) => callback(Option(e).toLeft(rm).toTry) },
)
).map(jf => F.delay(jf.cancel(true)).void)

/** The returned F[_] completes as soon as the underlying
* Producer.send(record) call returns. This is immediately after the producer
* enqueues the record, not after Kafka accepts the write. If the producer's
Expand All @@ -95,17 +112,44 @@ case class ProducerImpl[F[_], K, V](p: Producer[K, V])(implicit F: Async[F])
* future.get() call throws an exception. You should use this method if your
* program should not proceed until Kafka accepts the write, or you need to
* use the RecordMetadata, or you need to explicitly handle any possible
* error.
* error. Note that traversing many records with this operation prevents the
* underlying producer from batching multiple records.
*/
def sendSync(record: ProducerRecord[K, V]): F[RecordMetadata] =
F.delay(sendRaw(record)).map(_.get())

/** Similar to sendSync, except the returned F[_] is completed asynchronously,
* usually on the producer's I/O thread. TODO does this have different
* blocking semantics than sendSync?
* usually on the producer's I/O thread. Note that traversing many records
* with this operation prevents the underlying producer from batching
* multiple records.
*/
def sendAsync(record: ProducerRecord[K, V]): F[RecordMetadata] =
F.async(sendRaw(record, _))

/** The outer effect completes synchronously when the underlying Producer.send
* call returns. This is immediately after the producer enqueues the record,
* not after Kafka accepts the write. If the producer's internal queue is
* full, it will block until the record can be enqueued (i.e. backpressure).
* The outer effect is executed on a blocking context and is cancelable. The
* outer effect will only contain an error if the Producer.send call throws
* an exception. The inner effect completes asynchronously after Kafka
* acknowledges the write, and the RecordMetadata is available. The inner
* effect will only contain an error if the write failed. The inner effect is
* also cancelable. With this operation, user code can react to both the
* producer's initial buffering of the record to be sent, and the final
* result of the write (either success or failure).
*/
def send(record: ProducerRecord[K, V]): F[F[RecordMetadata]] =
// inspired by https://github.com/fd4s/fs2-kafka/blob/series/3.x/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
F.delay(Promise[RecordMetadata]())
.flatMap { promise =>
sendRaw2(record, promise.complete)
.map(cancel =>
F.fromFutureCancelable(
F.delay((promise.future, cancel))
)
)
}
}

object ProducerImpl {
Expand Down
227 changes: 227 additions & 0 deletions core/src/test/scala/com/banno/kafka/ProducerSendSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka

import cats.syntax.all.*
import cats.effect.{Sync, IO}
import munit.CatsEffectSuite
import org.scalacheck.Gen
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.*
import com.banno.kafka.admin.AdminApi
import com.banno.kafka.producer.*
import com.banno.kafka.consumer.*
import java.util.concurrent.{
Future => JFuture,
TimeUnit,
Executors,
CompletableFuture,
}
import scala.concurrent.duration.*

class ProducerSendSpec extends CatsEffectSuite {

val bootstrapServer = "localhost:9092"
val schemaRegistryUrl = "http://localhost:8091"

def randomId: String =
Gen.listOfN(10, Gen.alphaChar).map(_.mkString).sample.get
def genGroupId: String = randomId
def genTopic: String = randomId

def createTestTopic[F[_]: Sync](partitionCount: Int = 1): F[String] = {
val topicName = genTopic
AdminApi
.createTopicsIdempotent[F](
bootstrapServer,
List(new NewTopic(topicName, partitionCount, 1.toShort)),
)
.as(topicName)
}

test("send one record") {
ProducerApi
.resource[IO, String, String](
BootstrapServers(bootstrapServer)
)
.use { producer =>
for {
topic <- createTestTopic[IO]()
ack <- producer.send(new ProducerRecord(topic, "a", "a"))
rm <- ack
} yield {
assertEquals(rm.topic, topic)
assertEquals(rm.offset, 0L)
}
}
}

test("send many records") {
ProducerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer)
)
.use { producer =>
ConsumerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer),
GroupId(genGroupId),
AutoOffsetReset.earliest,
)
.use { consumer =>
for {
topic <- createTestTopic[IO]()
values = (0 to 9).toList
sends = values
.map(v => producer.send(new ProducerRecord(topic, v, v)))
acks <- sends.sequence
rms <- acks.sequence
() <- consumer.subscribe(topic)
records <- consumer
.recordStream(100.millis)
.take(values.size.toLong)
.compile
.toList
} yield {
assertEquals(rms.size, values.size)
for ((rm, i) <- rms.zipWithIndex) {
assertEquals(rm.topic, topic)
assertEquals(rm.offset, i.toLong)
}
assertEquals(values, records.map(_.value))
}
}
}
}

test("outer effect fails on send throw") {
val producer =
ProducerImpl[IO, String, String](ThrowOnSendProducer[String, String]())
for {
topic <- createTestTopic[IO]()
result <- producer.send(new ProducerRecord(topic, "a", "a")).attempt
} yield {
assertEquals(result, Left(SendThrowTestException()))
}
}

test("inner effect fails on callback with exception") {
val producer =
ProducerImpl[IO, String, String](FailedCallbackProducer[String, String]())
for {
topic <- createTestTopic[IO]()
ack <- producer.send(new ProducerRecord(topic, "a", "a"))
result <- ack.attempt
} yield {
assertEquals(result, Left(CallbackFailureTestException()))
}
}

}

case class SendThrowTestException() extends RuntimeException("Send throw test")

case class ThrowOnSendProducer[K, V]() extends Producer[K, V] {
def send(r: ProducerRecord[K, V], cb: Callback): JFuture[RecordMetadata] =
throw SendThrowTestException()

def abortTransaction(): Unit = ???
def beginTransaction(): Unit = ???
def close(x$1: java.time.Duration): Unit = ???
def close(): Unit = ???
def commitTransaction(): Unit = ???
def flush(): Unit = ???
def initTransactions(): Unit = ???
def metrics(): java.util.Map[
org.apache.kafka.common.MetricName,
_ <: org.apache.kafka.common.Metric,
] = ???
def partitionsFor(
x$1: String
): java.util.List[org.apache.kafka.common.PartitionInfo] = ???
def send(
x$1: org.apache.kafka.clients.producer.ProducerRecord[K, V]
): java.util.concurrent.Future[
org.apache.kafka.clients.producer.RecordMetadata
] = ???
def sendOffsetsToTransaction(
x$1: java.util.Map[
org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata,
],
x$2: org.apache.kafka.clients.consumer.ConsumerGroupMetadata,
): Unit = ???
def sendOffsetsToTransaction(
x$1: java.util.Map[
org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata,
],
x$2: String,
): Unit = ???
}

case class CallbackFailureTestException()
extends RuntimeException("Callback throw test")

case class FailedCallbackProducer[K, V]() extends Producer[K, V] {
val scheduler = Executors.newSingleThreadScheduledExecutor()
def send(r: ProducerRecord[K, V], cb: Callback): JFuture[RecordMetadata] = {
scheduler.schedule(
new Runnable() {
override def run(): Unit =
cb.onCompletion(null, CallbackFailureTestException())
},
100L,
TimeUnit.MILLISECONDS,
)
new CompletableFuture()
}

def abortTransaction(): Unit = ???
def beginTransaction(): Unit = ???
def close(x$1: java.time.Duration): Unit = ???
def close(): Unit = ???
def commitTransaction(): Unit = ???
def flush(): Unit = ???
def initTransactions(): Unit = ???
def metrics(): java.util.Map[
org.apache.kafka.common.MetricName,
_ <: org.apache.kafka.common.Metric,
] = ???
def partitionsFor(
x$1: String
): java.util.List[org.apache.kafka.common.PartitionInfo] = ???
def send(
x$1: org.apache.kafka.clients.producer.ProducerRecord[K, V]
): java.util.concurrent.Future[
org.apache.kafka.clients.producer.RecordMetadata
] = ???
def sendOffsetsToTransaction(
x$1: java.util.Map[
org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata,
],
x$2: org.apache.kafka.clients.consumer.ConsumerGroupMetadata,
): Unit = ???
def sendOffsetsToTransaction(
x$1: java.util.Map[
org.apache.kafka.common.TopicPartition,
org.apache.kafka.clients.consumer.OffsetAndMetadata,
],
x$2: String,
): Unit = ???
}

0 comments on commit cf40871

Please sign in to comment.