Skip to content

Commit

Permalink
Refactor cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Dec 28, 2023
1 parent 0135dde commit 39296d2
Showing 1 changed file with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,18 @@ case class ProducerImpl[F[_], K, V](p: Producer[K, V], supervisor: Supervisor[F]
// inspired by https://github.com/fd4s/fs2-kafka/blob/series/3.x/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
for {
promise <- F.delay(Promise[RecordMetadata]())
sent <- Deferred[F, Unit]
cancelToken <- Deferred[F, F[Unit]]
_ <- supervisor.supervise(
T.spanR("kafka4s-send").use(wrap =>
for {
cancel <- wrap(T.span("buffer")(sendRaw2(record, promise.complete)))
_ <- sent.complete(())
_ <- cancelToken.complete(cancel)
_ <- wrap(T.span("ack")(F.fromFuture(F.delay(promise.future))))
_ <- wrap(T.span("ack")(F.fromFutureCancelable(F.delay(promise.future -> F.unit))))
} yield ()
)
)
_ <- sent.get
} yield F.fromFuture(F.delay(promise.future))
token <- cancelToken.get
} yield F.fromFutureCancelable(F.delay(promise.future -> token))
}

object ProducerImpl {
Expand Down

0 comments on commit 39296d2

Please sign in to comment.