diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 22cb229c..f275fb7f 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -114,15 +114,15 @@ jobs:
run: sbt '++ ${{ matrix.scala }}' unusedCompileDependenciesTest
- name: Make target directories
- if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
+ if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
run: mkdir -p target core/target site/target project/target
- name: Compress target directories
- if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
+ if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
run: tar cf targets.tar target core/target site/target project/target
- name: Upload target directories
- if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
+ if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
uses: actions/upload-artifact@v3
with:
name: target-${{ matrix.os }}-${{ matrix.java }}-${{ matrix.scala }}
@@ -131,7 +131,7 @@ jobs:
publish:
name: Publish Artifacts
needs: [build]
- if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.8')
+ if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.9')
strategy:
matrix:
os: [ubuntu-latest]
@@ -294,7 +294,7 @@ jobs:
run: sbt '++ ${{ matrix.scala }}' docs/tlSite
- name: Publish site
- if: github.event_name != 'pull_request' && github.ref == 'refs/heads/series/0.8'
+ if: github.event_name != 'pull_request' && github.ref == 'refs/heads/series/0.9'
uses: peaceiris/actions-gh-pages@v3.9.0
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
diff --git a/build.sbt b/build.sbt
index 3d5d3aef..c4aeffc2 100644
--- a/build.sbt
+++ b/build.sbt
@@ -36,7 +36,7 @@ ThisBuild / mergifyLabelPaths += "docs" -> file("docs")
val catsV = "2.9.0"
val catsEffectV = "3.4.6"
-val fs2V = "3.5.0"
+val fs2V = "3.6.0"
val scodecV = "1.1.35"
val http4sV = "0.23.18"
val reactiveStreamsV = "1.0.4"
@@ -57,10 +57,8 @@ val coreDeps = Seq(
"org.typelevel" %% "cats-effect-kernel" % catsEffectV,
"org.typelevel" %% "cats-effect-std" % catsEffectV,
"co.fs2" %% "fs2-core" % fs2V,
- "co.fs2" %% "fs2-reactive-streams" % fs2V,
"org.http4s" %% "http4s-client" % http4sV,
"org.http4s" %% "http4s-core" % http4sV,
- "org.reactivestreams" % "reactive-streams" % reactiveStreamsV,
"org.scodec" %% "scodec-bits" % scodecV,
"org.typelevel" %% "vault" % vaultV,
"org.typelevel" %% "case-insensitive" % caseInsensitiveV
@@ -73,7 +71,7 @@ val coreDeps = Seq(
val scala213 = "2.13.10"
ThisBuild / crossScalaVersions := Seq("2.12.17", scala213, "3.2.2")
ThisBuild / scalaVersion := scala213
-ThisBuild / tlBaseVersion := "0.8"
+ThisBuild / tlBaseVersion := "0.9"
ThisBuild / startYear := Some(2019)
ThisBuild / developers := List(
tlGitHubDev("ChristopherDavenport", "Christopher Davenport"),
@@ -83,8 +81,8 @@ ThisBuild / developers := List(
ThisBuild / tlJdkRelease := Some(11)
ThisBuild / githubWorkflowJavaVersions := Seq("11", "17").map(JavaSpec.temurin(_))
-ThisBuild / tlCiReleaseBranches := Seq("series/0.8")
-ThisBuild / tlSitePublishBranch := Some("series/0.8")
+ThisBuild / tlCiReleaseBranches := Seq("series/0.9")
+ThisBuild / tlSitePublishBranch := Some("series/0.9")
lazy val docsSettings =
Seq(
@@ -94,8 +92,9 @@ lazy val docsSettings =
import laika.rewrite._
_.site.versions(
Versions(
- currentVersion = Version("0.8.x", "0.8"),
+ currentVersion = Version("0.9.x", "0.9"),
olderVersions = Seq(
+ Version("0.8.x", "0.8"),
Version("0.7.x", "0.7"),
Version("0.6.x", "0.6.0-M7"),
Version("0.5.x", "0.5.0"),
diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala
index b4bd17b5..1ad54ced 100644
--- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala
+++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala
@@ -23,7 +23,7 @@ import cats.implicits._
import fs2.Chunk
import fs2.Stream
import fs2.concurrent.SignallingRef
-import fs2.interop.reactivestreams._
+import fs2.interop.flow
import org.http4s.Header
import org.http4s.Headers
import org.http4s.HttpVersion
@@ -32,7 +32,6 @@ import org.http4s.Response
import org.http4s.Status
import org.http4s.client.Client
import org.http4s.internal.CollectionCompat.CollectionConverters._
-import org.reactivestreams.FlowAdapters
import org.typelevel.ci.CIString
import java.net.URI
@@ -61,21 +60,19 @@ object JdkHttpClient {
ignoredHeaders: Set[CIString] = restrictedHeaders
)(implicit F: Async[F]): Client[F] = {
def convertRequest(req: Request[F]): Resource[F, HttpRequest] =
- StreamUnicastPublisher(req.body.chunks.map(_.toByteBuffer)).evalMap { publisher =>
+ flow.toPublisher(req.body.chunks.map(_.toByteBuffer)).evalMap { publisher =>
convertHttpVersionFromHttp4s[F](req.httpVersion).map { version =>
val rb = HttpRequest.newBuilder
.method(
- req.method.name, {
- val flowPublisher = FlowAdapters.toFlowPublisher(publisher)
- if (req.isChunked)
- BodyPublishers.fromPublisher(flowPublisher)
- else
- req.contentLength match {
- case Some(length) if length > 0L =>
- BodyPublishers.fromPublisher(flowPublisher, length)
- case _ => BodyPublishers.noBody
- }
- }
+ req.method.name,
+ if (req.isChunked)
+ BodyPublishers.fromPublisher(publisher)
+ else
+ req.contentLength match {
+ case Some(length) if length > 0L =>
+ BodyPublishers.fromPublisher(publisher, length)
+ case _ => BodyPublishers.noBody
+ }
)
.uri(URI.create(req.uri.renderString))
.version(version)
@@ -193,24 +190,20 @@ object JdkHttpClient {
}.uncancelable
}
.flatMap { case (subscription, res) =>
- val body: Stream[F, util.List[ByteBuffer]] =
- Stream
- .eval(StreamSubscriber[F, util.List[ByteBuffer]](1))
- .flatMap(s =>
- s.sub.stream(
- // Complete the TrybleDeferred so that we indicate we have
- // subscribed to the Publisher.
- //
- // This only happens _after_ someone attempts to pull from the
- // body and will never happen if the body is never pulled
- // from. In that case, the AlwaysCancelingSubscriber handles
- // cleanup.
- F.uncancelable { _ =>
- subscription.complete(()) *>
- F.delay(FlowAdapters.toPublisher(res.body).subscribe(s))
- }
- )
- )
+ val body =
+ flow.fromPublisher[F, util.List[ByteBuffer]](1) { subscriber =>
+ // Complete the TrybleDeferred so that we indicate we have
+ // subscribed to the Publisher.
+ //
+ // This only happens _after_ someone attempts to pull from the
+ // body and will never happen if the body is never pulled
+ // from. In that case, the AlwaysCancelingSubscriber handles
+ // cleanup.
+ F.uncancelable { _ =>
+ subscription.complete(()) *>
+ F.delay(res.body.subscribe(subscriber))
+ }
+ }
Resource(
(F.fromEither(Status.fromInt(res.statusCode)), SignallingRef[F, Boolean](false)).mapN {
case (status, signal) =>
diff --git a/docs/index.html b/docs/index.html
index 27e9de85..7138aa18 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -1,4 +1,4 @@
-
-
+
+