Skip to content

Commit

Permalink
Merge branch 'series/2.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
runtologist committed Jun 27, 2022
2 parents deaaeb6 + 2f25e15 commit b516749
Show file tree
Hide file tree
Showing 13 changed files with 633 additions and 465 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name: CI
on:
pull_request:
push:
branches: ['master']
branches: [
'master'
]
release:
types:
- published
Expand Down Expand Up @@ -31,7 +33,7 @@ jobs:
fail-fast: false
matrix:
java: ['[email protected]', '[email protected]']
scala: ['2.11.12', '2.12.15', '2.13.7', '3.1.0']
scala: ['2.11.12', '2.12.15', '2.13.8', '3.1.0']
steps:
- name: Checkout current branch
uses: actions/[email protected]
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ project/secrets.tar.xz
project/travis-deploy-key
project/zecret
target
test-output
test-output/
metals.sbt
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version = "3.0.4"
runner.dialect = scala213
runner.dialect = scala213source3
maxColumn = 120
align.preset = most
align.multiline = false
Expand Down
37 changes: 15 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![Releases][Badge-SonatypeReleases]][Link-SonatypeReleases]
[![Snapshots][Badge-SonatypeSnapshots]][Link-SonatypeSnapshots]

This library provides an interoperability layer for reactive streams.
This library provides an interoperability layer between ZIO and reactive streams.

## Reactive Streams `Producer` and `Subscriber`

Expand All @@ -22,8 +22,6 @@ import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._

val runtime = new DefaultRuntime {}
```

We use the following `Publisher` and `Subscriber` for the examples:
Expand All @@ -44,37 +42,32 @@ A `Publisher` used as a `Stream` buffers up to `qSize` elements. If possible, `q
a power of two for best performance. The default is 16.

```scala mdoc
val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
streamFromPublisher.run(Sink.collectAll[Integer])
)
val streamFromPublisher = publisher.toZIOStream(qSize = 16)
streamFromPublisher.run(Sink.collectAll[Integer])
```

### Subscriber to Sink

When running a `Stream` to a `Subscriber`, a side channel is needed for signalling failures.
For this reason `toSink` returns a tuple of `Promise` and `Sink`. The `Promise` must be failed
on `Stream` failure. The type parameter on `toSink` is the error type of *the Stream*.
For this reason `toZIOSink` returns a tuple of a callback and a `Sink`. The callback must be used to signal `Stream` failure. The type parameter on `toZIOSink` is the error type of *the Stream*.

```scala mdoc
val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.flatMap { case (errorP, sink) =>
failingStream.run(sink).catchAll(errorP.fail)
val asSink = subscriber.toZIOSink[Throwable]
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
ZIO.scoped {
asSink.flatMap { case (signalError, sink) => // FIXME
failingStream.run(sink).catchAll(signalError)
}
)
}
```

### Stream to Publisher

```scala mdoc
val stream = Stream.range(3, 13)
runtime.unsafeRun(
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
)
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
```

### Sink to Subscriber
Expand All @@ -86,11 +79,11 @@ a power of two for best performance. The default is 16.

```scala mdoc
val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
ZIO.scoped {
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
)
}
```

[Badge-CI]: https://github.com/zio/interop-reactive-streams/workflows/CI/badge.svg
Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ Global / onChangedBuildSource := ReloadOnSourceChanges
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "1.0.15"
val rsVersion = "1.0.3"
val collCompatVersion = "2.5.0"
val zioVersion = "2.0.0"
val rsVersion = "1.0.4"
val collCompatVersion = "2.7.0"

lazy val interopReactiveStreams = project
.in(file("."))
Expand All @@ -63,3 +63,5 @@ lazy val interopReactiveStreams = project
Seq("org.scala-lang.modules" %% "scala-collection-compat" % collCompatVersion % Test)
}
)
// .settings(Test / javaOptions += "-XX:ActiveProcessorCount=1") // uncomment to test for deadlocks
.settings(Test / fork := true)
26 changes: 1 addition & 25 deletions project/BuildHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scalafix.sbt.ScalafixPlugin.autoImport._
object BuildHelper {
val Scala211 = "2.11.12"
val Scala212 = "2.12.15"
val Scala213 = "2.13.7"
val Scala213 = "2.13.8"
val ScalaDotty = "3.1.0"

private val stdOptions = Seq(
Expand Down Expand Up @@ -189,30 +189,6 @@ object BuildHelper {
case _ =>
Nil
}
},
Test / unmanagedSourceDirectories ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, x)) if x <= 11 =>
Seq(
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.11")),
CrossType.Full.sharedSrcDir(baseDirectory.value, "test").toList.map(f => file(f.getPath + "-2.x"))
).flatten
case Some((2, x)) if x >= 12 =>
Seq(
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.12")),
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.12+")),
CrossType.Full.sharedSrcDir(baseDirectory.value, "test").toList.map(f => file(f.getPath + "-2.x"))
).flatten
case Some((3, _)) =>
Seq(
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.12+")),
CrossType.Full.sharedSrcDir(baseDirectory.value, "main").toList.map(f => file(f.getPath + "-2.12+")),
CrossType.Full.sharedSrcDir(baseDirectory.value, "test").toList.map(f => file(f.getPath + "-dotty"))
).flatten
case _ =>
Nil
}

}
)

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.5
sbt.version=1.6.2
Loading

0 comments on commit b516749

Please sign in to comment.