Skip to content

Commit

Permalink
fix race in Adapters.publisherToStream (#339)
Browse files Browse the repository at this point in the history
* fix race in Adapters.publisherToStream

* fix race in Adapters.publisherToStream

Co-authored-by: Simon Schenk <[email protected]>
  • Loading branch information
darl and runtologist authored Oct 16, 2022
1 parent 467d9ee commit 74fa6d0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ object Adapters {

override def await(): IO[Option[Throwable], Unit] =
done match {
case Some(value) => ZIO.fail(value)
case Some(value) =>
if (q.isEmpty()) ZIO.fail(value) else ZIO.unit
case None =>
val p = Promise.unsafe.make[Option[Throwable], Unit](FiberId.None)
toNotify = Some(p)
Expand All @@ -161,7 +162,7 @@ object Adapters {
done.fold(p.await) { e =>
// The producer has canceled or errored in the meantime.
toNotify = None
ZIO.fail(e)
if (q.isEmpty()) ZIO.fail(e) else ZIO.unit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.interop.reactivestreams
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import org.reactivestreams.example.unicast.NumberIterablePublisher
import org.reactivestreams.tck.TestEnvironment
import org.reactivestreams.tck.TestEnvironment.ManualPublisher
import zio.Chunk
Expand Down Expand Up @@ -183,6 +184,16 @@ object PublisherToStreamSpec extends ZIOSpecDefault {
succeeds(isUnit)
)
)
},
test("collect all messages") {
for {
executor <- ZIO.executor
sum <- ZIO
.foreach((1 to 10000).toVector) { _ =>
Adapters.publisherToStream(new NumberIterablePublisher(0, 1, executor.asJava), 16).runCount
}
.map(_.sum)
} yield assert(sum)(equalTo(10000L))
}
)

Expand Down

0 comments on commit 74fa6d0

Please sign in to comment.