From 542b8a1a326be00dda541f68163ca258cbe1c563 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 8 Feb 2023 10:11:15 -0800 Subject: [PATCH] Migrate to new FS2 `j.u.c.Flow` interop APIs (#812) * First attempt to use new flow interop * Bump to latest flow interop snapshot * Bump snapshot * Update fs2-core, fs2-reactive-streams to 3.6.0 in series/0.8 * Bump version everywhere * Regenerate workflow --------- Co-authored-by: http4s-steward[bot] <106843772+http4s-steward[bot]@users.noreply.github.com> --- .github/workflows/ci.yml | 10 ++-- build.sbt | 13 ++--- .../http4s/jdkhttpclient/JdkHttpClient.scala | 57 ++++++++----------- docs/index.html | 4 +- 4 files changed, 38 insertions(+), 46 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22cb229c..f275fb7f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,15 +114,15 @@ jobs: run: sbt '++ ${{ matrix.scala }}' unusedCompileDependenciesTest - name: Make target directories - if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8') + if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9') run: mkdir -p target core/target site/target project/target - name: Compress target directories - if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8') + if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9') run: tar cf targets.tar target core/target site/target project/target - name: Upload target directories - if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8') + if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9') uses: actions/upload-artifact@v3 with: name: target-${{ matrix.os }}-${{ matrix.java }}-${{ matrix.scala }} @@ -131,7 +131,7 @@ jobs: publish: name: Publish Artifacts needs: [build] - if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8') + if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9') strategy: matrix: os: [ubuntu-latest] @@ -294,7 +294,7 @@ jobs: run: sbt '++ ${{ matrix.scala }}' docs/tlSite - name: Publish site - if: github.event_name != 'pull_request' && github.ref == 'refs/heads/series/0.8' + if: github.event_name != 'pull_request' && github.ref == 'refs/heads/series/0.9' uses: peaceiris/actions-gh-pages@v3.9.0 with: github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/build.sbt b/build.sbt index 3d5d3aef..c4aeffc2 100644 --- a/build.sbt +++ b/build.sbt @@ -36,7 +36,7 @@ ThisBuild / mergifyLabelPaths += "docs" -> file("docs") val catsV = "2.9.0" val catsEffectV = "3.4.6" -val fs2V = "3.5.0" +val fs2V = "3.6.0" val scodecV = "1.1.35" val http4sV = "0.23.18" val reactiveStreamsV = "1.0.4" @@ -57,10 +57,8 @@ val coreDeps = Seq( "org.typelevel" %% "cats-effect-kernel" % catsEffectV, "org.typelevel" %% "cats-effect-std" % catsEffectV, "co.fs2" %% "fs2-core" % fs2V, - "co.fs2" %% "fs2-reactive-streams" % fs2V, "org.http4s" %% "http4s-client" % http4sV, "org.http4s" %% "http4s-core" % http4sV, - "org.reactivestreams" % "reactive-streams" % reactiveStreamsV, "org.scodec" %% "scodec-bits" % scodecV, "org.typelevel" %% "vault" % vaultV, "org.typelevel" %% "case-insensitive" % caseInsensitiveV @@ -73,7 +71,7 @@ val coreDeps = Seq( val scala213 = "2.13.10" ThisBuild / crossScalaVersions := Seq("2.12.17", scala213, "3.2.2") ThisBuild / scalaVersion := scala213 -ThisBuild / tlBaseVersion := "0.8" +ThisBuild / tlBaseVersion := "0.9" ThisBuild / startYear := Some(2019) ThisBuild / developers := List( tlGitHubDev("ChristopherDavenport", "Christopher Davenport"), @@ -83,8 +81,8 @@ ThisBuild / developers := List( ThisBuild / tlJdkRelease := Some(11) ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_)) -ThisBuild / tlCiReleaseBranches := Seq("series/0.8") -ThisBuild / tlSitePublishBranch := Some("series/0.8") +ThisBuild / tlCiReleaseBranches := Seq("series/0.9") +ThisBuild / tlSitePublishBranch := Some("series/0.9") lazy val docsSettings = Seq( @@ -94,8 +92,9 @@ lazy val docsSettings = import laika.rewrite._ _.site.versions( Versions( - currentVersion = Version("0.8.x", "0.8"), + currentVersion = Version("0.9.x", "0.9"), olderVersions = Seq( + Version("0.8.x", "0.8"), Version("0.7.x", "0.7"), Version("0.6.x", "0.6.0-M7"), Version("0.5.x", "0.5.0"), diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala index b4bd17b5..1ad54ced 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala @@ -23,7 +23,7 @@ import cats.implicits._ import fs2.Chunk import fs2.Stream import fs2.concurrent.SignallingRef -import fs2.interop.reactivestreams._ +import fs2.interop.flow import org.http4s.Header import org.http4s.Headers import org.http4s.HttpVersion @@ -32,7 +32,6 @@ import org.http4s.Response import org.http4s.Status import org.http4s.client.Client import org.http4s.internal.CollectionCompat.CollectionConverters._ -import org.reactivestreams.FlowAdapters import org.typelevel.ci.CIString import java.net.URI @@ -61,21 +60,19 @@ object JdkHttpClient { ignoredHeaders: Set[CIString] = restrictedHeaders )(implicit F: Async[F]): Client[F] = { def convertRequest(req: Request[F]): Resource[F, HttpRequest] = - StreamUnicastPublisher(req.body.chunks.map(_.toByteBuffer)).evalMap { publisher => + flow.toPublisher(req.body.chunks.map(_.toByteBuffer)).evalMap { publisher => convertHttpVersionFromHttp4s[F](req.httpVersion).map { version => val rb = HttpRequest.newBuilder .method( - req.method.name, { - val flowPublisher = FlowAdapters.toFlowPublisher(publisher) - if (req.isChunked) - BodyPublishers.fromPublisher(flowPublisher) - else - req.contentLength match { - case Some(length) if length > 0L => - BodyPublishers.fromPublisher(flowPublisher, length) - case _ => BodyPublishers.noBody - } - } + req.method.name, + if (req.isChunked) + BodyPublishers.fromPublisher(publisher) + else + req.contentLength match { + case Some(length) if length > 0L => + BodyPublishers.fromPublisher(publisher, length) + case _ => BodyPublishers.noBody + } ) .uri(URI.create(req.uri.renderString)) .version(version) @@ -193,24 +190,20 @@ object JdkHttpClient { }.uncancelable } .flatMap { case (subscription, res) => - val body: Stream[F, util.List[ByteBuffer]] = - Stream - .eval(StreamSubscriber[F, util.List[ByteBuffer]](1)) - .flatMap(s => - s.sub.stream( - // Complete the TrybleDeferred so that we indicate we have - // subscribed to the Publisher. - // - // This only happens _after_ someone attempts to pull from the - // body and will never happen if the body is never pulled - // from. In that case, the AlwaysCancelingSubscriber handles - // cleanup. - F.uncancelable { _ => - subscription.complete(()) *> - F.delay(FlowAdapters.toPublisher(res.body).subscribe(s)) - } - ) - ) + val body = + flow.fromPublisher[F, util.List[ByteBuffer]](1) { subscriber => + // Complete the TrybleDeferred so that we indicate we have + // subscribed to the Publisher. + // + // This only happens _after_ someone attempts to pull from the + // body and will never happen if the body is never pulled + // from. In that case, the AlwaysCancelingSubscriber handles + // cleanup. + F.uncancelable { _ => + subscription.complete(()) *> + F.delay(res.body.subscribe(subscriber)) + } + } Resource( (F.fromEither(Status.fromInt(res.statusCode)), SignallingRef[F, Boolean](false)).mapN { case (status, signal) => diff --git a/docs/index.html b/docs/index.html index 27e9de85..7138aa18 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1,4 +1,4 @@ - - + +