From 39296d204f867292738a385b1531541a1c166a23 Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Thu, 28 Dec 2023 18:33:21 -0500 Subject: [PATCH] Refactor cancellation --- .../scala/com/banno/kafka/producer/ProducerImpl.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala b/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala index 54c0544ae..b2ecd1158 100644 --- a/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala +++ b/core/src/main/scala/com/banno/kafka/producer/ProducerImpl.scala @@ -147,20 +147,18 @@ case class ProducerImpl[F[_], K, V](p: Producer[K, V], supervisor: Supervisor[F] // inspired by https://github.com/fd4s/fs2-kafka/blob/series/3.x/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala for { promise <- F.delay(Promise[RecordMetadata]()) - sent <- Deferred[F, Unit] cancelToken <- Deferred[F, F[Unit]] _ <- supervisor.supervise( T.spanR("kafka4s-send").use(wrap => for { cancel <- wrap(T.span("buffer")(sendRaw2(record, promise.complete))) - _ <- sent.complete(()) _ <- cancelToken.complete(cancel) - _ <- wrap(T.span("ack")(F.fromFuture(F.delay(promise.future)))) + _ <- wrap(T.span("ack")(F.fromFutureCancelable(F.delay(promise.future -> F.unit)))) } yield () ) ) - _ <- sent.get - } yield F.fromFuture(F.delay(promise.future)) + token <- cancelToken.get + } yield F.fromFutureCancelable(F.delay(promise.future -> token)) } object ProducerImpl {