Skip to content

Commit

Permalink
Migrate to new FS2 j.u.c.Flow interop APIs (#812)
Browse files Browse the repository at this point in the history
* First attempt to use new flow interop

* Bump to latest flow interop snapshot

* Bump snapshot

* Update fs2-core, fs2-reactive-streams to 3.6.0 in series/0.8

* Bump version everywhere

* Regenerate workflow

---------

Co-authored-by: http4s-steward[bot] <106843772+http4s-steward[bot]@users.noreply.github.com>
  • Loading branch information
armanbilge and http4s-steward[bot] authored Feb 8, 2023
1 parent 0d8f0f3 commit 542b8a1
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 46 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ jobs:
run: sbt '++ ${{ matrix.scala }}' unusedCompileDependenciesTest

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
run: mkdir -p target core/target site/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
run: tar cf targets.tar target core/target site/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
uses: actions/upload-artifact@v3
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-${{ matrix.scala }}
Expand All @@ -131,7 +131,7 @@ jobs:
publish:
name: Publish Artifacts
needs: [build]
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
strategy:
matrix:
os: [ubuntu-latest]
Expand Down Expand Up @@ -294,7 +294,7 @@ jobs:
run: sbt '++ ${{ matrix.scala }}' docs/tlSite

- name: Publish site
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/series/0.8'
if: github.event_name != 'pull_request' && github.ref == 'refs/heads/series/0.9'
uses: peaceiris/[email protected]
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
13 changes: 6 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ThisBuild / mergifyLabelPaths += "docs" -> file("docs")

val catsV = "2.9.0"
val catsEffectV = "3.4.6"
val fs2V = "3.5.0"
val fs2V = "3.6.0"
val scodecV = "1.1.35"
val http4sV = "0.23.18"
val reactiveStreamsV = "1.0.4"
Expand All @@ -57,10 +57,8 @@ val coreDeps = Seq(
"org.typelevel" %% "cats-effect-kernel" % catsEffectV,
"org.typelevel" %% "cats-effect-std" % catsEffectV,
"co.fs2" %% "fs2-core" % fs2V,
"co.fs2" %% "fs2-reactive-streams" % fs2V,
"org.http4s" %% "http4s-client" % http4sV,
"org.http4s" %% "http4s-core" % http4sV,
"org.reactivestreams" % "reactive-streams" % reactiveStreamsV,
"org.scodec" %% "scodec-bits" % scodecV,
"org.typelevel" %% "vault" % vaultV,
"org.typelevel" %% "case-insensitive" % caseInsensitiveV
Expand All @@ -73,7 +71,7 @@ val coreDeps = Seq(
val scala213 = "2.13.10"
ThisBuild / crossScalaVersions := Seq("2.12.17", scala213, "3.2.2")
ThisBuild / scalaVersion := scala213
ThisBuild / tlBaseVersion := "0.8"
ThisBuild / tlBaseVersion := "0.9"
ThisBuild / startYear := Some(2019)
ThisBuild / developers := List(
tlGitHubDev("ChristopherDavenport", "Christopher Davenport"),
Expand All @@ -83,8 +81,8 @@ ThisBuild / developers := List(

ThisBuild / tlJdkRelease := Some(11)
ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_))
ThisBuild / tlCiReleaseBranches := Seq("series/0.8")
ThisBuild / tlSitePublishBranch := Some("series/0.8")
ThisBuild / tlCiReleaseBranches := Seq("series/0.9")
ThisBuild / tlSitePublishBranch := Some("series/0.9")

lazy val docsSettings =
Seq(
Expand All @@ -94,8 +92,9 @@ lazy val docsSettings =
import laika.rewrite._
_.site.versions(
Versions(
currentVersion = Version("0.8.x", "0.8"),
currentVersion = Version("0.9.x", "0.9"),
olderVersions = Seq(
Version("0.8.x", "0.8"),
Version("0.7.x", "0.7"),
Version("0.6.x", "0.6.0-M7"),
Version("0.5.x", "0.5.0"),
Expand Down
57 changes: 25 additions & 32 deletions core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.implicits._
import fs2.Chunk
import fs2.Stream
import fs2.concurrent.SignallingRef
import fs2.interop.reactivestreams._
import fs2.interop.flow
import org.http4s.Header
import org.http4s.Headers
import org.http4s.HttpVersion
Expand All @@ -32,7 +32,6 @@ import org.http4s.Response
import org.http4s.Status
import org.http4s.client.Client
import org.http4s.internal.CollectionCompat.CollectionConverters._
import org.reactivestreams.FlowAdapters
import org.typelevel.ci.CIString

import java.net.URI
Expand Down Expand Up @@ -61,21 +60,19 @@ object JdkHttpClient {
ignoredHeaders: Set[CIString] = restrictedHeaders
)(implicit F: Async[F]): Client[F] = {
def convertRequest(req: Request[F]): Resource[F, HttpRequest] =
StreamUnicastPublisher(req.body.chunks.map(_.toByteBuffer)).evalMap { publisher =>
flow.toPublisher(req.body.chunks.map(_.toByteBuffer)).evalMap { publisher =>
convertHttpVersionFromHttp4s[F](req.httpVersion).map { version =>
val rb = HttpRequest.newBuilder
.method(
req.method.name, {
val flowPublisher = FlowAdapters.toFlowPublisher(publisher)
if (req.isChunked)
BodyPublishers.fromPublisher(flowPublisher)
else
req.contentLength match {
case Some(length) if length > 0L =>
BodyPublishers.fromPublisher(flowPublisher, length)
case _ => BodyPublishers.noBody
}
}
req.method.name,
if (req.isChunked)
BodyPublishers.fromPublisher(publisher)
else
req.contentLength match {
case Some(length) if length > 0L =>
BodyPublishers.fromPublisher(publisher, length)
case _ => BodyPublishers.noBody
}
)
.uri(URI.create(req.uri.renderString))
.version(version)
Expand Down Expand Up @@ -193,24 +190,20 @@ object JdkHttpClient {
}.uncancelable
}
.flatMap { case (subscription, res) =>
val body: Stream[F, util.List[ByteBuffer]] =
Stream
.eval(StreamSubscriber[F, util.List[ByteBuffer]](1))
.flatMap(s =>
s.sub.stream(
// Complete the TrybleDeferred so that we indicate we have
// subscribed to the Publisher.
//
// This only happens _after_ someone attempts to pull from the
// body and will never happen if the body is never pulled
// from. In that case, the AlwaysCancelingSubscriber handles
// cleanup.
F.uncancelable { _ =>
subscription.complete(()) *>
F.delay(FlowAdapters.toPublisher(res.body).subscribe(s))
}
)
)
val body =
flow.fromPublisher[F, util.List[ByteBuffer]](1) { subscriber =>
// Complete the TrybleDeferred so that we indicate we have
// subscribed to the Publisher.
//
// This only happens _after_ someone attempts to pull from the
// body and will never happen if the body is never pulled
// from. In that case, the AlwaysCancelingSubscriber handles
// cleanup.
F.uncancelable { _ =>
subscription.complete(()) *>
F.delay(res.body.subscribe(subscriber))
}
}
Resource(
(F.fromEither(Status.fromInt(res.statusCode)), SignallingRef[F, Boolean](false)).mapN {
case (status, signal) =>
Expand Down
4 changes: 2 additions & 2 deletions docs/index.html
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!DOCTYPE html>
<meta charset="utf-8">
<meta http-equiv="refresh" content="0; URL=0.8">
<link rel="canonical" href="0.8">
<meta http-equiv="refresh" content="0; URL=0.9">
<link rel="canonical" href="0.9">

0 comments on commit 542b8a1

Please sign in to comment.