From c42170e95baa1e796886ad9a15856b4c34fdc269 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Sun, 9 Jun 2019 09:53:20 -0400 Subject: [PATCH 1/3] Replaced blockingExecutionContext with Blocker --- README.md | 12 +-- build.sbt | 4 +- .../jvm/src/test/scala/fs2/TestPlatform.scala | 19 ----- core/shared/src/main/scala/fs2/Stream.scala | 27 +++--- docs/ReadmeExample.md | 37 ++++---- docs/src/ReadmeExample.md | 31 ++++--- .../main/scala/fs2/io/file/FileHandle.scala | 30 +++---- io/src/main/scala/fs2/io/file/Watcher.scala | 38 ++++----- io/src/main/scala/fs2/io/file/file.scala | 28 +++--- io/src/main/scala/fs2/io/file/pulls.scala | 17 ++-- io/src/main/scala/fs2/io/io.scala | 85 ++++++------------- .../main/scala/fs2/io/tcp/SocketGroup.scala | 39 ++++----- .../fs2/io/udp/AsynchronousSocketGroup.scala | 9 +- .../main/scala/fs2/io/udp/SocketGroup.scala | 25 +++--- io/src/test/scala/fs2/io/IoSpec.scala | 14 +-- io/src/test/scala/fs2/io/file/FileSpec.scala | 12 +-- .../test/scala/fs2/io/file/WatcherSpec.scala | 10 +-- io/src/test/scala/fs2/io/tcp/SocketSpec.scala | 4 +- io/src/test/scala/fs2/io/udp/UdpSpec.scala | 4 +- 19 files changed, 177 insertions(+), 268 deletions(-) diff --git a/README.md b/README.md index 220e4e44b1..4ace55010c 100644 --- a/README.md +++ b/README.md @@ -73,29 +73,25 @@ libraryDependencies += "co.fs2" %%% "fs2-core" % "1.0.4" FS2 is a streaming I/O library. The design goals are compositionality, expressiveness, resource safety, and speed. Here's a simple example of its use: ```scala -import cats.effect.{ExitCode, IO, IOApp, Resource} +import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource} import cats.implicits._ import fs2.{io, text, Stream} import java.nio.file.Paths -import java.util.concurrent.Executors -import scala.concurrent.ExecutionContext object Converter extends IOApp { - private val blockingExecutionContext = - Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown())) - val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC => + val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker => def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0) - io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096) + io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096) .through(text.utf8Decode) .through(text.lines) .filter(s => !s.trim.isEmpty && !s.startsWith("//")) .map(line => fahrenheitToCelsius(line.toDouble).toString) .intersperse("\n") .through(text.utf8Encode) - .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC)) + .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker)) } def run(args: List[String]): IO[ExitCode] = diff --git a/build.sbt b/build.sbt index 8ee850eee1..f95560bc99 100644 --- a/build.sbt +++ b/build.sbt @@ -56,8 +56,8 @@ lazy val commonSettings = Seq( compilerPlugin("org.typelevel" %% "kind-projector" % "0.10.1"), "org.typelevel" %%% "cats-core" % "2.0.0-M2", "org.typelevel" %%% "cats-laws" % "2.0.0-M2" % "test", - "org.typelevel" %%% "cats-effect" % "2.0.0-M2", - "org.typelevel" %%% "cats-effect-laws" % "2.0.0-M2" % "test", + "org.typelevel" %%% "cats-effect" % "2.0.0-637eb93", + "org.typelevel" %%% "cats-effect-laws" % "2.0.0-637eb93" % "test", "org.scalacheck" %%% "scalacheck" % "1.14.0" % "test", "org.scalatest" %%% "scalatest" % "3.1.0-SNAP11" % "test", "org.scalatestplus" %%% "scalatestplus-scalacheck" % "1.0.0-SNAP6" % "test" diff --git a/core/jvm/src/test/scala/fs2/TestPlatform.scala b/core/jvm/src/test/scala/fs2/TestPlatform.scala index 4da3e8cf20..763255bcfa 100644 --- a/core/jvm/src/test/scala/fs2/TestPlatform.scala +++ b/core/jvm/src/test/scala/fs2/TestPlatform.scala @@ -1,24 +1,5 @@ package fs2 -import scala.concurrent.ExecutionContext - -import cats.effect.{IO, Resource} -import cats.implicits._ - -import java.util.concurrent.Executors - -import fs2.internal.ThreadFactories - trait TestPlatform { - def isJVM: Boolean = true - - val blockingExecutionContext: Resource[IO, ExecutionContext] = - Resource - .make( - IO(ExecutionContext.fromExecutorService( - Executors.newCachedThreadPool(ThreadFactories.named("fs2-blocking", true)))))(ec => - IO(ec.shutdown())) - .widen[ExecutionContext] - } diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 178510b480..5f49f22b04 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -13,7 +13,6 @@ import fs2.internal.{Resource => _, _} import java.io.PrintStream import scala.annotation.tailrec -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ /** @@ -1573,7 +1572,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * Writes this stream of strings to the supplied `PrintStream`. * * Note: printing to the `PrintStream` is performed *synchronously*. - * Use `linesAsync(out, blockingEc)` if synchronous writes are a concern. + * Use `linesAsync(out, blocker)` if synchronous writes are a concern. */ def lines[F2[x] >: F[x]](out: PrintStream)(implicit F: Sync[F2], ev: O <:< String): Stream[F2, Unit] = { @@ -1587,13 +1586,13 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * * Note: printing to the `PrintStream` is performed on the supplied blocking execution context. */ - def linesAsync[F2[x] >: F[x]](out: PrintStream, blockingExecutionContext: ExecutionContext)( + def linesAsync[F2[x] >: F[x]](out: PrintStream, blocker: Blocker)( implicit F: Sync[F2], cs: ContextShift[F2], ev: O <:< String): Stream[F2, Unit] = { val _ = ev val src = this.asInstanceOf[Stream[F2, String]] - src.evalMap(str => cs.evalOn(blockingExecutionContext)(F.delay(out.println(str)))) + src.evalMap(str => blocker.delay(out.println(str))) } /** @@ -2348,7 +2347,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * Writes this stream to the supplied `PrintStream`, converting each element to a `String` via `Show`. * * Note: printing to the `PrintStream` is performed *synchronously*. - * Use `showLinesAsync(out, blockingEc)` if synchronous writes are a concern. + * Use `showLinesAsync(out, blocker)` if synchronous writes are a concern. */ def showLines[F2[x] >: F[x], O2 >: O](out: PrintStream)(implicit F: Sync[F2], showO: Show[O2]): Stream[F2, Unit] = @@ -2359,12 +2358,10 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * * Note: printing to the `PrintStream` is performed on the supplied blocking execution context. */ - def showLinesAsync[F2[x] >: F[x], O2 >: O](out: PrintStream, - blockingExecutionContext: ExecutionContext)( - implicit F: Sync[F2], - cs: ContextShift[F2], - showO: Show[O2]): Stream[F2, Unit] = - covaryAll[F2, O2].map(_.show).linesAsync(out, blockingExecutionContext) + def showLinesAsync[F2[x] >: F[x]: Sync: ContextShift, O2 >: O: Show]( + out: PrintStream, + blocker: Blocker): Stream[F2, Unit] = + covaryAll[F2, O2].map(_.show).linesAsync(out, blocker) /** * Writes this stream to standard out, converting each element to a `String` via `Show`. @@ -2381,11 +2378,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing, * * Note: printing to the `PrintStream` is performed on the supplied blocking execution context. */ - def showLinesStdOutAsync[F2[x] >: F[x], O2 >: O](blockingExecutionContext: ExecutionContext)( - implicit F: Sync[F2], - cs: ContextShift[F2], - showO: Show[O2]): Stream[F2, Unit] = - showLinesAsync[F2, O2](Console.out, blockingExecutionContext) + def showLinesStdOutAsync[F2[x] >: F[x]: Sync: ContextShift, O2 >: O: Show]( + blocker: Blocker): Stream[F2, Unit] = + showLinesAsync[F2, O2](Console.out, blocker) /** * Groups inputs in fixed size chunks by passing a "sliding window" diff --git a/docs/ReadmeExample.md b/docs/ReadmeExample.md index 08d1b8a939..5e303423f2 100644 --- a/docs/ReadmeExample.md +++ b/docs/ReadmeExample.md @@ -3,8 +3,8 @@ This walks through the implementation of the example given in [the README](../README.md). This program opens a file, `fahrenheit.txt`, containing temperatures in degrees fahrenheit, one per line, and converts each temperature to celsius, incrementally writing to the file `celsius.txt`. Both files will be closed, regardless of whether any errors occur. ```scala -import cats.effect.{ExitCode, IO, IOApp, Resource} -// import cats.effect.{ExitCode, IO, IOApp, Resource} +import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource} +// import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource} import cats.implicits._ // import cats.implicits._ @@ -15,28 +15,20 @@ import fs2.{io, text, Stream} import java.nio.file.Paths // import java.nio.file.Paths -import java.util.concurrent.Executors -// import java.util.concurrent.Executors - -import scala.concurrent.ExecutionContext -// import scala.concurrent.ExecutionContext - object Converter extends IOApp { - private val blockingExecutionContext = - Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown())) - val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC => + val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker => def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0) - io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096) + io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096) .through(text.utf8Decode) .through(text.lines) .filter(s => !s.trim.isEmpty && !s.startsWith("//")) .map(line => fahrenheitToCelsius(line.toDouble).toString) .intersperse("\n") .through(text.utf8Encode) - .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC)) + .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker)) } def run(args: List[String]): IO[ExitCode] = @@ -54,7 +46,7 @@ Operations on `Stream` are defined for any choice of type constructor, not just `fs2.io` has a number of helper functions for constructing or working with streams that talk to the outside world. `readAll` creates a stream of bytes from a file name (specified via a `java.nio.file.Path`). It encapsulates the logic for opening and closing the file, so that users of this stream do not need to remember to close the file when they are done or in the event of exceptions during processing of the stream. ```scala -import cats.effect.{ContextShift, IO} +import cats.effect.{Blocker, ContextShift, IO} import fs2.{io, text} import java.nio.file.Paths import java.util.concurrent.Executors @@ -62,9 +54,12 @@ import scala.concurrent.ExecutionContext implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global) -//note: this should be shut down when it's no longer necessary - normally that's at the end of your app. -//See the whole README example for proper resource management in terms of ExecutionContexts. -val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) +// Note: to make these examples work in docs, we create a `Blocker` manually here but in real code, +// we should always use `Blocker[IO]`, which returns the blocker as a resource that shuts down the pool +// upon finalization, like in the original example. +// See the whole README example for proper resource management in terms of `Blocker`. +val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) +val blocker: Blocker = Blocker.unsafeFromExecutionContext(blockingPool) def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0) @@ -75,7 +70,7 @@ scala> import fs2.Stream import fs2.Stream scala> val src: Stream[IO, Byte] = - | io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096) + | io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096) src: fs2.Stream[cats.effect.IO,Byte] = Stream(..) ``` @@ -118,7 +113,7 @@ encodedBytes: fs2.Stream[cats.effect.IO,Byte] = Stream(..) We then write the encoded bytes to a file. Note that nothing has happened at this point -- we are just constructing a description of a computation that, when interpreted, will incrementally consume the stream, sending converted values to the specified file. ```scala -scala> val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext)) +scala> val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker)) written: fs2.Stream[cats.effect.IO,Unit] = Stream(..) ``` @@ -131,8 +126,8 @@ task: cats.effect.IO[Unit] = We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. In this example, we extended `IOApp`, which lets us express our overall program as an `IO[ExitCase]`. The `IOApp` class handles running the task and hooking it up to the application entry point. -Let's shut down the ExecutionContext that we allocated earlier. +Let's shut down the thread pool that we allocated earlier -- reminder: in real code, we would not manually control the lifecycle of the blocking thread pool -- we'd use the resource returned from `Blocker[IO]` to manage it automatically, like in the full example we started with. ```scala -scala> blockingExecutionContext.shutdown() +scala> blockingPool.shutdown() ``` diff --git a/docs/src/ReadmeExample.md b/docs/src/ReadmeExample.md index 566613c797..fb10208f3b 100644 --- a/docs/src/ReadmeExample.md +++ b/docs/src/ReadmeExample.md @@ -3,29 +3,25 @@ This walks through the implementation of the example given in [the README](../README.md). This program opens a file, `fahrenheit.txt`, containing temperatures in degrees fahrenheit, one per line, and converts each temperature to celsius, incrementally writing to the file `celsius.txt`. Both files will be closed, regardless of whether any errors occur. ```tut:book -import cats.effect.{ExitCode, IO, IOApp, Resource} +import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource} import cats.implicits._ import fs2.{io, text, Stream} import java.nio.file.Paths -import java.util.concurrent.Executors -import scala.concurrent.ExecutionContext object Converter extends IOApp { - private val blockingExecutionContext = - Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown())) - val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC => + val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker => def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0) - io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096) + io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096) .through(text.utf8Decode) .through(text.lines) .filter(s => !s.trim.isEmpty && !s.startsWith("//")) .map(line => fahrenheitToCelsius(line.toDouble).toString) .intersperse("\n") .through(text.utf8Encode) - .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC)) + .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker)) } def run(args: List[String]): IO[ExitCode] = @@ -42,7 +38,7 @@ Operations on `Stream` are defined for any choice of type constructor, not just `fs2.io` has a number of helper functions for constructing or working with streams that talk to the outside world. `readAll` creates a stream of bytes from a file name (specified via a `java.nio.file.Path`). It encapsulates the logic for opening and closing the file, so that users of this stream do not need to remember to close the file when they are done or in the event of exceptions during processing of the stream. ```tut:silent -import cats.effect.{ContextShift, IO} +import cats.effect.{Blocker, ContextShift, IO} import fs2.{io, text} import java.nio.file.Paths import java.util.concurrent.Executors @@ -50,9 +46,12 @@ import scala.concurrent.ExecutionContext implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global) -//note: this should be shut down when it's no longer necessary - normally that's at the end of your app. -//See the whole README example for proper resource management in terms of ExecutionContexts. -val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) +// Note: to make these examples work in docs, we create a `Blocker` manually here but in real code, +// we should always use `Blocker[IO]`, which returns the blocker as a resource that shuts down the pool +// upon finalization, like in the original example. +// See the whole README example for proper resource management in terms of `Blocker`. +val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) +val blocker: Blocker = Blocker.unsafeFromExecutionContext(blockingPool) def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0) @@ -62,7 +61,7 @@ def fahrenheitToCelsius(f: Double): Double = import fs2.Stream val src: Stream[IO, Byte] = - io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096) + io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096) ``` A stream can be attached to a pipe, allowing for stateful transformations of the input values. Here, we attach the source stream to the `text.utf8Decode` pipe, which converts the stream of bytes to a stream of strings. We then attach the result to the `text.lines` pipe, which buffers strings and emits full lines. Pipes are expressed using the type `Pipe[F,I,O]`, which describes a pipe that can accept input values of type `I` and can output values of type `O`, potentially evaluating an effect periodically. @@ -97,7 +96,7 @@ val encodedBytes: Stream[IO, Byte] = withNewlines.through(text.utf8Encode) We then write the encoded bytes to a file. Note that nothing has happened at this point -- we are just constructing a description of a computation that, when interpreted, will incrementally consume the stream, sending converted values to the specified file. ```tut -val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext)) +val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker)) ``` There are a number of ways of interpreting the stream. In this case, we call `compile.drain`, which returns a val value of the effect type, `IO`. The output of the stream is ignored - we compile it solely for its effect. @@ -108,8 +107,8 @@ val task: IO[Unit] = written.compile.drain We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. In this example, we extended `IOApp`, which lets us express our overall program as an `IO[ExitCase]`. The `IOApp` class handles running the task and hooking it up to the application entry point. -Let's shut down the ExecutionContext that we allocated earlier. +Let's shut down the thread pool that we allocated earlier -- reminder: in real code, we would not manually control the lifecycle of the blocking thread pool -- we'd use the resource returned from `Blocker[IO]` to manage it automatically, like in the full example we started with. ```tut -blockingExecutionContext.shutdown() +blockingPool.shutdown() ``` diff --git a/io/src/main/scala/fs2/io/file/FileHandle.scala b/io/src/main/scala/fs2/io/file/FileHandle.scala index 0ebd4587e3..2ad74c632d 100644 --- a/io/src/main/scala/fs2/io/file/FileHandle.scala +++ b/io/src/main/scala/fs2/io/file/FileHandle.scala @@ -2,12 +2,10 @@ package fs2 package io package file -import scala.concurrent.ExecutionContext - import java.nio.ByteBuffer import java.nio.channels.{FileChannel, FileLock} -import cats.effect.{ContextShift, Sync} +import cats.effect.{Blocker, ContextShift, Sync} /** * Provides the ability to read/write/lock/inspect a file in the effect `F`. @@ -95,27 +93,23 @@ private[file] object FileHandle { /** * Creates a `FileHandle[F]` from a `java.nio.channels.FileChannel`. */ - private[file] def fromFileChannel[F[_]](chan: FileChannel, - blockingExecutionContext: ExecutionContext)( + private[file] def fromFileChannel[F[_]](chan: FileChannel, blocker: Blocker)( implicit F: Sync[F], cs: ContextShift[F]): FileHandle[F] = new FileHandle[F] { type Lock = FileLock - private def doBlocking[A](a: => A): F[A] = - blockingDelay(blockingExecutionContext)(a) - override def force(metaData: Boolean): F[Unit] = - doBlocking(chan.force(metaData)) + blocker.delay(chan.force(metaData)) override def lock: F[Lock] = - doBlocking(chan.lock) + blocker.delay(chan.lock) override def lock(position: Long, size: Long, shared: Boolean): F[Lock] = - doBlocking(chan.lock(position, size, shared)) + blocker.delay(chan.lock(position, size, shared)) override def read(numBytes: Int, offset: Long): F[Option[Chunk[Byte]]] = - doBlocking { + blocker.delay { val buf = ByteBuffer.allocate(numBytes) val len = chan.read(buf, offset) if (len < 0) None @@ -124,21 +118,21 @@ private[file] object FileHandle { } override def size: F[Long] = - doBlocking(chan.size) + blocker.delay(chan.size) override def truncate(size: Long): F[Unit] = - doBlocking { chan.truncate(size); () } + blocker.delay { chan.truncate(size); () } override def tryLock: F[Option[Lock]] = - doBlocking(Option(chan.tryLock())) + blocker.delay(Option(chan.tryLock())) override def tryLock(position: Long, size: Long, shared: Boolean): F[Option[Lock]] = - doBlocking(Option(chan.tryLock(position, size, shared))) + blocker.delay(Option(chan.tryLock(position, size, shared))) override def unlock(f: Lock): F[Unit] = - doBlocking(f.release()) + blocker.delay(f.release()) override def write(bytes: Chunk[Byte], offset: Long): F[Int] = - doBlocking(chan.write(bytes.toBytes.toByteBuffer, offset)) + blocker.delay(chan.write(bytes.toBytes.toByteBuffer, offset)) } } diff --git a/io/src/main/scala/fs2/io/file/Watcher.scala b/io/src/main/scala/fs2/io/file/Watcher.scala index ddc3ad45f6..d592faad72 100644 --- a/io/src/main/scala/fs2/io/file/Watcher.scala +++ b/io/src/main/scala/fs2/io/file/Watcher.scala @@ -2,7 +2,6 @@ package fs2 package io import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import cats.effect._ @@ -127,20 +126,18 @@ object Watcher { } /** Creates a watcher for the default file system. */ - def default[F[_]](blockingExecutionContext: ExecutionContext)( - implicit F: Concurrent[F], - cs: ContextShift[F]): Resource[F, Watcher[F]] = + def default[F[_]](blocker: Blocker)(implicit F: Concurrent[F], + cs: ContextShift[F]): Resource[F, Watcher[F]] = Resource - .liftF(blockingDelay(blockingExecutionContext)(FileSystems.getDefault)) - .flatMap(fromFileSystem(blockingExecutionContext, _)) + .liftF(blocker.delay(FileSystems.getDefault)) + .flatMap(fromFileSystem(blocker, _)) /** Creates a watcher for the supplied file system. */ - def fromFileSystem[F[_]](blockingExecutionContext: ExecutionContext, fs: FileSystem)( + def fromFileSystem[F[_]](blocker: Blocker, fs: FileSystem)( implicit F: Concurrent[F], cs: ContextShift[F]): Resource[F, Watcher[F]] = - Resource(blockingDelay(blockingExecutionContext)(fs.newWatchService).flatMap { ws => - fromWatchService(blockingExecutionContext, ws).map(w => - w -> blockingDelay(blockingExecutionContext)(ws.close)) + Resource(blocker.delay(fs.newWatchService).flatMap { ws => + fromWatchService(blocker, ws).map(w => w -> blocker.delay(ws.close)) }) private case class Registration[F[_]](types: Seq[EventType], @@ -151,27 +148,27 @@ object Watcher { cleanup: F[Unit]) /** Creates a watcher for the supplied NIO `WatchService`. */ - def fromWatchService[F[_]](blockingExecutionContext: ExecutionContext, ws: WatchService)( + def fromWatchService[F[_]](blocker: Blocker, ws: WatchService)( implicit F: Concurrent[F], cs: ContextShift[F]): F[Watcher[F]] = SignallingRef[F, Map[WatchKey, Registration[F]]](Map.empty) - .map(new DefaultWatcher(blockingExecutionContext, ws, _)) + .map(new DefaultWatcher(blocker, ws, _)) private class DefaultWatcher[F[_]]( - blockingExecutionContext: ExecutionContext, + blocker: Blocker, ws: WatchService, registrations: SignallingRef[F, Map[WatchKey, Registration[F]]])(implicit F: Concurrent[F], cs: ContextShift[F]) extends Watcher[F] { private def isDir(p: Path): F[Boolean] = - blockingDelay(blockingExecutionContext)(Files.isDirectory(p)) + blocker.delay(Files.isDirectory(p)) private def track(key: WatchKey, r: Registration[F]): F[F[Unit]] = registrations .update(_.updated(key, r)) .as { - blockingDelay(blockingExecutionContext)(key.cancel) >> registrations.modify { s => + blocker.delay(key.cancel) >> registrations.modify { s => (s - key) -> s.get(key).map(_.cleanup).getOrElse(F.unit) }.flatten } @@ -193,7 +190,7 @@ object Watcher { false) else if (types.contains(EventType.Created)) (types, false) else (EventType.Created +: types, true) - val dirs: F[List[Path]] = blockingDelay(blockingExecutionContext) { + val dirs: F[List[Path]] = blocker.delay { var dirs: List[Path] = Nil Files.walkFileTree(path, new SimpleFileVisitor[Path] { override def preVisitDirectory(path: Path, attrs: BasicFileAttributes) = { @@ -233,7 +230,7 @@ object Watcher { private def registerUntracked(path: Path, types: Seq[Watcher.EventType], modifiers: Seq[WatchEvent.Modifier]): F[WatchKey] = - blockingDelay(blockingExecutionContext) { + blocker.delay { val typesWithDefaults = if (types.isEmpty) List(EventType.Created, EventType.Deleted, EventType.Modified, EventType.Overflow) @@ -256,11 +253,12 @@ object Watcher { if (reg.map(_.recurse).getOrElse(false)) { val created = events.collect { case Event.Created(p, _) => p } def watchIfDirectory(p: Path): F[(F[Unit], List[Event])] = - blockingDelay(blockingExecutionContext)(Files.isDirectory(p)) + blocker + .delay(Files.isDirectory(p)) .ifM( watch(p, Seq(EventType.Created), reg.map(_.modifiers).getOrElse(Nil)).flatMap { cancel => - val events: F[List[Event]] = blockingDelay(blockingExecutionContext) { + val events: F[List[Event]] = blocker.delay { var dirs: List[Path] = Nil Files.list(p).forEach(d => dirs = d :: dirs) dirs.map(Event.Created(_, 1)) @@ -290,7 +288,7 @@ object Watcher { private def unfilteredEvents( pollTimeout: FiniteDuration): Stream[F, (WatchKey, List[Event])] = { - val poll: F[Option[(WatchKey, List[Event])]] = blockingDelay(blockingExecutionContext) { + val poll: F[Option[(WatchKey, List[Event])]] = blocker.delay { val key = ws.poll(pollTimeout.toMillis, TimeUnit.MILLISECONDS) if (key eq null) None else { diff --git a/io/src/main/scala/fs2/io/file/file.scala b/io/src/main/scala/fs2/io/file/file.scala index 85c7411699..5aac2e4d4f 100644 --- a/io/src/main/scala/fs2/io/file/file.scala +++ b/io/src/main/scala/fs2/io/file/file.scala @@ -1,12 +1,11 @@ package fs2 package io -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import java.nio.file.{Path, StandardOpenOption, WatchEvent} -import cats.effect.{Concurrent, ContextShift, Resource, Sync} +import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync} /** Provides support for working with files. */ package object file { @@ -14,12 +13,10 @@ package object file { /** * Reads all data synchronously from the file at the specified `java.nio.file.Path`. */ - def readAll[F[_]: Sync: ContextShift](path: Path, - blockingExecutionContext: ExecutionContext, - chunkSize: Int)( + def readAll[F[_]: Sync: ContextShift](path: Path, blocker: Blocker, chunkSize: Int)( ): Stream[F, Byte] = pulls - .fromPath(path, blockingExecutionContext, List(StandardOpenOption.READ)) + .fromPath(path, blocker, List(StandardOpenOption.READ)) .flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c.resource)) .stream @@ -29,13 +26,13 @@ package object file { * two bytes are read. */ def readRange[F[_]: Sync: ContextShift](path: Path, - blockingExecutionContext: ExecutionContext, + blocker: Blocker, chunkSize: Int, start: Long, end: Long)( ): Stream[F, Byte] = pulls - .fromPath(path, blockingExecutionContext, List(StandardOpenOption.READ)) + .fromPath(path, blocker, List(StandardOpenOption.READ)) .flatMap(c => pulls.readRangeFromFileHandle(chunkSize, start, end)(c.resource)) .stream @@ -46,14 +43,12 @@ package object file { */ def writeAll[F[_]: Sync: ContextShift]( path: Path, - blockingExecutionContext: ExecutionContext, + blocker: Blocker, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE) ): Pipe[F, Byte, Unit] = in => (for { - out <- pulls.fromPath(path, - blockingExecutionContext, - StandardOpenOption.WRITE :: flags.toList) + out <- pulls.fromPath(path, blocker, StandardOpenOption.WRITE :: flags.toList) fileHandle = out.resource offset <- if (flags.contains(StandardOpenOption.APPEND)) Pull.eval(fileHandle.size) else Pull.pure(0L) @@ -87,9 +82,8 @@ package object file { * * @return singleton bracketed stream returning a watcher */ - def watcher[F[_]: Concurrent: ContextShift]( - blockingExecutionContext: ExecutionContext): Resource[F, Watcher[F]] = - Watcher.default(blockingExecutionContext) + def watcher[F[_]: Concurrent: ContextShift](blocker: Blocker): Resource[F, Watcher[F]] = + Watcher.default(blocker) /** * Watches a single path. @@ -97,12 +91,12 @@ package object file { * Alias for creating a watcher and watching the supplied path, releasing the watcher when the resulting stream is finalized. */ def watch[F[_]: Concurrent: ContextShift]( - blockingExecutionContext: ExecutionContext, + blocker: Blocker, path: Path, types: Seq[Watcher.EventType] = Nil, modifiers: Seq[WatchEvent.Modifier] = Nil, pollTimeout: FiniteDuration = 1.second): Stream[F, Watcher.Event] = Stream - .resource(Watcher.default(blockingExecutionContext)) + .resource(Watcher.default(blocker)) .flatMap(w => Stream.eval_(w.watch(path, types, modifiers)) ++ w.events(pollTimeout)) } diff --git a/io/src/main/scala/fs2/io/file/pulls.scala b/io/src/main/scala/fs2/io/file/pulls.scala index e1dabab858..324c7bc711 100644 --- a/io/src/main/scala/fs2/io/file/pulls.scala +++ b/io/src/main/scala/fs2/io/file/pulls.scala @@ -2,12 +2,10 @@ package fs2 package io package file -import scala.concurrent.ExecutionContext - import java.nio.channels._ import java.nio.file._ -import cats.effect.{ContextShift, Sync} +import cats.effect.{Blocker, ContextShift, Sync} /** Provides various `Pull`s for working with files. */ object pulls { @@ -81,23 +79,20 @@ object pulls { * * The `Pull` closes the acquired `java.nio.channels.FileChannel` when it is done. */ - def fromPath[F[_]](path: Path, - blockingExecutionContext: ExecutionContext, - flags: Seq[OpenOption])( + def fromPath[F[_]](path: Path, blocker: Blocker, flags: Seq[OpenOption])( implicit F: Sync[F], cs: ContextShift[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] = - fromFileChannel(blockingDelay(blockingExecutionContext)(FileChannel.open(path, flags: _*)), - blockingExecutionContext) + fromFileChannel(blocker.delay(FileChannel.open(path, flags: _*)), blocker) /** * Given a `java.nio.channels.FileChannel`, will create a `Pull` which allows synchronous operations against the underlying file. * * The `Pull` closes the provided `java.nio.channels.FileChannel` when it is done. */ - def fromFileChannel[F[_]](channel: F[FileChannel], blockingExecutionContext: ExecutionContext)( + def fromFileChannel[F[_]](channel: F[FileChannel], blocker: Blocker)( implicit F: Sync[F], cs: ContextShift[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] = Pull - .acquireCancellable(channel)(ch => blockingDelay(blockingExecutionContext)(ch.close())) - .map(_.map(FileHandle.fromFileChannel[F](_, blockingExecutionContext))) + .acquireCancellable(channel)(ch => blocker.delay(ch.close())) + .map(_.map(FileHandle.fromFileChannel[F](_, blocker))) } diff --git a/io/src/main/scala/fs2/io/io.scala b/io/src/main/scala/fs2/io/io.scala index 235daa47f7..213efba6f5 100644 --- a/io/src/main/scala/fs2/io/io.scala +++ b/io/src/main/scala/fs2/io/io.scala @@ -1,20 +1,17 @@ package fs2 -import cats.effect.{Async, ConcurrentEffect, ContextShift, Resource, Sync} +import cats.effect.{Async, Blocker, ConcurrentEffect, ContextShift, Resource, Sync} import cats._ import cats.implicits._ import java.io.{InputStream, OutputStream} import java.nio.charset.Charset -import scala.concurrent.ExecutionContext - /** * Provides various ways to work with streams that perform IO. * - * These methods accept a blocking `ExecutionContext`, as the underlying - * implementations perform blocking IO. The recommendation is to use an - * unbounded thread pool with application level bounds. + * These methods accept a `cats.effect.Blocker`, as the underlying + * implementations perform blocking IO. * * @see [[https://typelevel.org/cats-effect/concurrency/basics.html#blocking-threads]] */ @@ -28,12 +25,12 @@ package object io { def readInputStream[F[_]]( fis: F[InputStream], chunkSize: Int, - blockingExecutionContext: ExecutionContext, + blocker: Blocker, closeAfterUse: Boolean = true)(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, Byte] = readInputStreamGeneric( fis, F.delay(new Array[Byte](chunkSize)), - blockingExecutionContext, + blocker, closeAfterUse ) @@ -52,21 +49,19 @@ package object io { def unsafeReadInputStream[F[_]]( fis: F[InputStream], chunkSize: Int, - blockingExecutionContext: ExecutionContext, + blocker: Blocker, closeAfterUse: Boolean = true)(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, Byte] = readInputStreamGeneric( fis, F.pure(new Array[Byte](chunkSize)), - blockingExecutionContext, + blocker, closeAfterUse ) - private def readBytesFromInputStream[F[_]](is: InputStream, - buf: Array[Byte], - blockingExecutionContext: ExecutionContext)( + private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte], blocker: Blocker)( implicit F: Sync[F], cs: ContextShift[F]): F[Option[Chunk[Byte]]] = - blockingDelay(blockingExecutionContext)(is.read(buf)).map { numBytes => + blocker.delay(is.read(buf)).map { numBytes => if (numBytes < 0) None else if (numBytes == 0) Some(Chunk.empty) else if (numBytes < buf.size) Some(Chunk.bytes(buf.slice(0, numBytes))) @@ -76,17 +71,17 @@ package object io { private def readInputStreamGeneric[F[_]]( fis: F[InputStream], buf: F[Array[Byte]], - blockingExecutionContext: ExecutionContext, + blocker: Blocker, closeAfterUse: Boolean)(implicit F: Sync[F], cs: ContextShift[F]): Stream[F, Byte] = { def useIs(is: InputStream) = Stream - .eval(buf.flatMap(b => readBytesFromInputStream(is, b, blockingExecutionContext))) + .eval(buf.flatMap(b => readBytesFromInputStream(is, b, blocker))) .repeat .unNoneTerminate .flatMap(c => Stream.chunk(c)) if (closeAfterUse) - Stream.bracket(fis)(is => F.delay(is.close())).flatMap(useIs) + Stream.bracket(fis)(is => blocker.delay(is.close())).flatMap(useIs) else Stream.eval(fis).flatMap(useIs) } @@ -99,43 +94,30 @@ package object io { * blocking so the execution context should be configured appropriately. */ def writeOutputStream[F[_]](fos: F[OutputStream], - blockingExecutionContext: ExecutionContext, + blocker: Blocker, closeAfterUse: Boolean = true)( implicit F: Sync[F], cs: ContextShift[F]): Pipe[F, Byte, Unit] = s => { def useOs(os: OutputStream): Stream[F, Unit] = - s.chunks.evalMap(c => writeBytesToOutputStream(os, c, blockingExecutionContext)) - - def close(os: OutputStream): F[Unit] = - blockingDelay(blockingExecutionContext)(os.close()) + s.chunks.evalMap(c => blocker.delay(os.write(c.toArray))) - val os = if (closeAfterUse) Stream.bracket(fos)(close) else Stream.eval(fos) - os.flatMap(os => - useOs(os).scope ++ Stream.eval(cs.evalOn(blockingExecutionContext)(F.delay(os.flush())))) + val os = + if (closeAfterUse) Stream.bracket(fos)(os => blocker.delay(os.close())) + else Stream.eval(fos) + os.flatMap(os => useOs(os).scope ++ Stream.eval(blocker.delay(os.flush()))) } - private def writeBytesToOutputStream[F[_]](os: OutputStream, - bytes: Chunk[Byte], - blockingExecutionContext: ExecutionContext)( - implicit F: Sync[F], - cs: ContextShift[F]): F[Unit] = - blockingDelay(blockingExecutionContext)(os.write(bytes.toArray)) - // // STDIN/STDOUT Helpers /** Stream of bytes read asynchronously from standard input. */ - def stdin[F[_]](bufSize: Int, blockingExecutionContext: ExecutionContext)( - implicit F: Sync[F], - cs: ContextShift[F]): Stream[F, Byte] = - readInputStream(F.delay(System.in), bufSize, blockingExecutionContext, false) + def stdin[F[_]: Sync: ContextShift](bufSize: Int, blocker: Blocker): Stream[F, Byte] = + readInputStream(blocker.delay(System.in), bufSize, blocker, false) /** Pipe of bytes that writes emitted values to standard output asynchronously. */ - def stdout[F[_]](blockingExecutionContext: ExecutionContext)( - implicit F: Sync[F], - cs: ContextShift[F]): Pipe[F, Byte, Unit] = - writeOutputStream(F.delay(System.out), blockingExecutionContext, false) + def stdout[F[_]: Sync: ContextShift](blocker: Blocker): Pipe[F, Byte, Unit] = + writeOutputStream(blocker.delay(System.out), blocker, false) /** * Writes this stream to standard output asynchronously, converting each element to @@ -144,17 +126,14 @@ package object io { * Each write operation is performed on the supplied execution context. Writes are * blocking so the execution context should be configured appropriately. */ - def stdoutLines[F[_], O](blockingExecutionContext: ExecutionContext, - charset: Charset = utf8Charset)(implicit F: Sync[F], - cs: ContextShift[F], - show: Show[O]): Pipe[F, O, Unit] = - _.map(_.show).through(text.encode(charset)).through(stdout(blockingExecutionContext)) + def stdoutLines[F[_]: Sync: ContextShift, O: Show]( + blocker: Blocker, + charset: Charset = utf8Charset): Pipe[F, O, Unit] = + _.map(_.show).through(text.encode(charset)).through(stdout(blocker)) /** Stream of `String` read asynchronously from standard input decoded in UTF-8. */ - def stdinUtf8[F[_]](bufSize: Int, blockingExecutionContext: ExecutionContext)( - implicit F: Sync[F], - cs: ContextShift[F]): Stream[F, String] = - stdin(bufSize, blockingExecutionContext).through(text.utf8Decode) + def stdinUtf8[F[_]: Sync: ContextShift](bufSize: Int, blocker: Blocker): Stream[F, String] = + stdin(bufSize, blocker).through(text.utf8Decode) /** * Pipe that converts a stream of bytes to a stream that will emits a single `java.io.InputStream`, @@ -179,14 +158,6 @@ package object io { implicit F: ConcurrentEffect[F]): Resource[F, InputStream] = JavaInputOutputStream.toInputStream(source) - /** - * Like `Sync#delay` but evaluates the thunk on the supplied execution context - * and then shifts back to the default thread pool. - */ - private[io] def blockingDelay[F[_], A](blockingExecutionContext: ExecutionContext)( - thunk: => A)(implicit F: Sync[F], cs: ContextShift[F]): F[A] = - cs.evalOn(blockingExecutionContext)(F.delay(thunk)) - private[io] def asyncYield[F[_], A]( k: (Either[Throwable, A] => Unit) => Unit)(implicit F: Async[F], cs: ContextShift[F]): F[A] = F.guarantee(F.async(k))(cs.shift) diff --git a/io/src/main/scala/fs2/io/tcp/SocketGroup.scala b/io/src/main/scala/fs2/io/tcp/SocketGroup.scala index 08d0b6b1ae..2cfd7c5aec 100644 --- a/io/src/main/scala/fs2/io/tcp/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/tcp/SocketGroup.scala @@ -2,7 +2,6 @@ package fs2 package io package tcp -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import java.net.{InetSocketAddress, SocketAddress, StandardSocketOptions} @@ -19,7 +18,7 @@ import java.nio.channels.spi.AsynchronousChannelProvider import java.util.concurrent.{ThreadFactory, TimeUnit} import cats.implicits._ -import cats.effect.{Concurrent, ContextShift, Resource, Sync} +import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync} import cats.effect.concurrent.{Ref, Semaphore} import fs2.internal.ThreadFactories @@ -28,8 +27,7 @@ import fs2.internal.ThreadFactories * Resource that provides the ability to open client and server TCP sockets that all share * an underlying non-blocking channel group. */ -final class SocketGroup(channelGroup: AsynchronousChannelGroup, - blockingExecutionContext: ExecutionContext) { +final class SocketGroup(channelGroup: AsynchronousChannelGroup, blocker: Blocker) { /** * Stream that connects to the specified server and emits a single socket, @@ -52,7 +50,7 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, noDelay: Boolean = false )(implicit F: Concurrent[F], CS: ContextShift[F]): Resource[F, Socket[F]] = { - def setup: F[AsynchronousSocketChannel] = blockingDelay(blockingExecutionContext) { + def setup: F[AsynchronousSocketChannel] = blocker.delay { val ch = AsynchronousChannelProvider.provider().openAsynchronousSocketChannel(channelGroup) ch.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, reuseAddress) @@ -123,7 +121,7 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, CS: ContextShift[F] ): Stream[F, Either[InetSocketAddress, Resource[F, Socket[F]]]] = { - val setup: F[AsynchronousServerSocketChannel] = blockingDelay(blockingExecutionContext) { + val setup: F[AsynchronousServerSocketChannel] = blocker.delay { val ch = AsynchronousChannelProvider .provider() .openAsynchronousServerSocketChannel(channelGroup) @@ -134,7 +132,7 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, } def cleanup(sch: AsynchronousServerSocketChannel): F[Unit] = - blockingDelay(blockingExecutionContext)(if (sch.isOpen) sch.close()) + blocker.delay(if (sch.isOpen) sch.close()) def acceptIncoming(sch: AsynchronousServerSocketChannel): Stream[F, Resource[F, Socket[F]]] = { def go: Stream[F, Resource[F, Socket[F]]] = { @@ -159,7 +157,7 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, go.handleErrorWith { case err: AsynchronousCloseException => - Stream.eval(blockingDelay(blockingExecutionContext)(sch.isOpen)).flatMap { isOpen => + Stream.eval(blocker.delay(sch.isOpen)).flatMap { isOpen => if (isOpen) Stream.raiseError[F](err) else Stream.empty } @@ -312,22 +310,21 @@ final class SocketGroup(channelGroup: AsynchronousChannelGroup, } def localAddress: F[SocketAddress] = - blockingDelay(blockingExecutionContext)(ch.getLocalAddress) + blocker.delay(ch.getLocalAddress) def remoteAddress: F[SocketAddress] = - blockingDelay(blockingExecutionContext)(ch.getRemoteAddress) - def isOpen: F[Boolean] = blockingDelay(blockingExecutionContext)(ch.isOpen) - def close: F[Unit] = blockingDelay(blockingExecutionContext)(ch.close()) - def endOfOutput: F[Unit] = blockingDelay(blockingExecutionContext) { + blocker.delay(ch.getRemoteAddress) + def isOpen: F[Boolean] = blocker.delay(ch.isOpen) + def close: F[Unit] = blocker.delay(ch.close()) + def endOfOutput: F[Unit] = blocker.delay { ch.shutdownOutput(); () } - def endOfInput: F[Unit] = blockingDelay(blockingExecutionContext) { + def endOfInput: F[Unit] = blocker.delay { ch.shutdownInput(); () } } } } - Resource.make(socket)(_ => - blockingDelay(blockingExecutionContext)(if (ch.isOpen) ch.close else ()).attempt.void) + Resource.make(socket)(_ => blocker.delay(if (ch.isOpen) ch.close else ()).attempt.void) } } @@ -336,7 +333,7 @@ object SocketGroup { /** * Creates a `SocketGroup`. * - * The supplied `blockingExecutionContext` is used for networking calls other than + * The supplied `blocker` is used for networking calls other than * reads/writes. All reads and writes are performed on a non-blocking thread pool * associated with the `SocketGroup`. The non-blocking thread pool is sized to * the number of available processors but that can be overridden by supplying @@ -345,16 +342,16 @@ object SocketGroup { * information on NIO thread pooling. */ def apply[F[_]: Sync: ContextShift]( - blockingExecutionContext: ExecutionContext, + blocker: Blocker, nonBlockingThreadCount: Int = 0, nonBlockingThreadFactory: ThreadFactory = ThreadFactories.named("fs2-socket-group-blocking", true)): Resource[F, SocketGroup] = - Resource(blockingDelay(blockingExecutionContext) { + Resource(blocker.delay { val threadCount = if (nonBlockingThreadCount <= 0) Runtime.getRuntime.availableProcessors else nonBlockingThreadCount val acg = AsynchronousChannelGroup.withFixedThreadPool(threadCount, nonBlockingThreadFactory) - val group = new SocketGroup(acg, blockingExecutionContext) - (group, blockingDelay(blockingExecutionContext)(acg.shutdown())) + val group = new SocketGroup(acg, blocker) + (group, blocker.delay(acg.shutdown())) }) } diff --git a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala index e51d219cc7..17b1af8faf 100644 --- a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala @@ -4,7 +4,6 @@ package udp import scala.collection.JavaConverters._ import scala.collection.mutable.PriorityQueue -import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration import java.io.IOException @@ -21,7 +20,7 @@ import java.nio.channels.{ import java.util.ArrayDeque import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch} -import cats.effect.{ContextShift, Resource, Sync} +import cats.effect.{Blocker, ContextShift, Resource, Sync} /** * Supports read/write operations on an arbitrary number of UDP sockets using a shared selector thread. @@ -49,10 +48,8 @@ private[udp] object AsynchronousSocketGroup { */ private class WriterPacket(val remote: InetSocketAddress, val bytes: ByteBuffer) - def apply[F[_]: Sync: ContextShift]( - blockingExecutionContext: ExecutionContext): Resource[F, AsynchronousSocketGroup] = - Resource.make(blockingDelay(blockingExecutionContext)(unsafe))(g => - blockingDelay(blockingExecutionContext)(g.close())) + def apply[F[_]: Sync: ContextShift](blocker: Blocker): Resource[F, AsynchronousSocketGroup] = + Resource.make(blocker.delay(unsafe))(g => blocker.delay(g.close())) private def unsafe: AsynchronousSocketGroup = new AsynchronousSocketGroup { diff --git a/io/src/main/scala/fs2/io/udp/SocketGroup.scala b/io/src/main/scala/fs2/io/udp/SocketGroup.scala index 2dc212ac4d..00f8163752 100644 --- a/io/src/main/scala/fs2/io/udp/SocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/SocketGroup.scala @@ -2,7 +2,6 @@ package fs2 package io package udp -import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration import java.net.{ @@ -14,12 +13,12 @@ import java.net.{ } import java.nio.channels.{ClosedChannelException, DatagramChannel} -import cats.effect.{Concurrent, ContextShift, Resource, Sync} +import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync} import cats.implicits._ final class SocketGroup( asg: AsynchronousSocketGroup, - blockingExecutionContext: ExecutionContext + blocker: Blocker ) { /** @@ -46,7 +45,7 @@ final class SocketGroup( multicastTTL: Option[Int] = None, multicastLoopback: Boolean = true )(implicit F: Concurrent[F], CS: ContextShift[F]): Resource[F, Socket[F]] = { - val mkChannel = blockingDelay(blockingExecutionContext) { + val mkChannel = blocker.delay { val channel = protocolFamily .map { pf => DatagramChannel.open(pf) @@ -76,7 +75,7 @@ final class SocketGroup( private[udp] def mkSocket[F[_]](channel: DatagramChannel)(implicit F: Concurrent[F], cs: ContextShift[F]): F[Socket[F]] = - blockingDelay(blockingExecutionContext) { + blocker.delay { new Socket[F] { private val ctx = asg.register(channel) @@ -97,17 +96,17 @@ final class SocketGroup( def writes(timeout: Option[FiniteDuration]): Pipe[F, Packet, Unit] = _.flatMap(p => Stream.eval(write(p, timeout))) - def close: F[Unit] = blockingDelay(blockingExecutionContext) { asg.close(ctx) } + def close: F[Unit] = blocker.delay { asg.close(ctx) } def join(group: InetAddress, interface: NetworkInterface): F[AnySourceGroupMembership] = - blockingDelay(blockingExecutionContext) { + blocker.delay { val membership = channel.join(group, interface) new AnySourceGroupMembership { - def drop = blockingDelay(blockingExecutionContext) { membership.drop } + def drop = blocker.delay { membership.drop } def block(source: InetAddress) = F.delay { membership.block(source); () } - def unblock(source: InetAddress) = blockingDelay(blockingExecutionContext) { + def unblock(source: InetAddress) = blocker.delay { membership.unblock(source); () } override def toString = "AnySourceGroupMembership" @@ -119,7 +118,7 @@ final class SocketGroup( source: InetAddress): F[GroupMembership] = F.delay { val membership = channel.join(group, interface, source) new GroupMembership { - def drop = blockingDelay(blockingExecutionContext) { membership.drop } + def drop = blocker.delay { membership.drop } override def toString = "GroupMembership" } } @@ -133,8 +132,6 @@ final class SocketGroup( object SocketGroup { - def apply[F[_]: Sync: ContextShift]( - blockingExecutionContext: ExecutionContext): Resource[F, SocketGroup] = - AsynchronousSocketGroup[F](blockingExecutionContext).map(asg => - new SocketGroup(asg, blockingExecutionContext)) + def apply[F[_]: Sync: ContextShift](blocker: Blocker): Resource[F, SocketGroup] = + AsynchronousSocketGroup[F](blocker).map(asg => new SocketGroup(asg, blocker)) } diff --git a/io/src/test/scala/fs2/io/IoSpec.scala b/io/src/test/scala/fs2/io/IoSpec.scala index d4391f391d..9afa750904 100644 --- a/io/src/test/scala/fs2/io/IoSpec.scala +++ b/io/src/test/scala/fs2/io/IoSpec.scala @@ -1,7 +1,7 @@ package fs2.io import java.io.{ByteArrayInputStream, InputStream} -import cats.effect.IO +import cats.effect.{Blocker, IO} import fs2.Fs2Spec class IoSpec extends Fs2Spec { @@ -9,8 +9,8 @@ class IoSpec extends Fs2Spec { "non-buffered" in forAll(arrayGenerator[Byte], intsBetween(1, 20)) { (bytes: Array[Byte], chunkSize: Int) => val is: InputStream = new ByteArrayInputStream(bytes) - blockingExecutionContext.use { ec => - val stream = readInputStream(IO(is), chunkSize, ec) + Blocker[IO].use { blocker => + val stream = readInputStream(IO(is), chunkSize, blocker) stream.compile.toVector.asserting(_.toArray shouldBe bytes) } } @@ -18,8 +18,8 @@ class IoSpec extends Fs2Spec { "buffered" in forAll(arrayGenerator[Byte], intsBetween(1, 20)) { (bytes: Array[Byte], chunkSize: Int) => val is: InputStream = new ByteArrayInputStream(bytes) - blockingExecutionContext.use { ec => - val stream = readInputStream(IO(is), chunkSize, ec) + Blocker[IO].use { blocker => + val stream = readInputStream(IO(is), chunkSize, blocker) stream.buffer(chunkSize * 2).compile.toVector.asserting(_.toArray shouldBe bytes) } } @@ -29,8 +29,8 @@ class IoSpec extends Fs2Spec { "non-buffered" in forAll(arrayGenerator[Byte], intsBetween(1, 20)) { (bytes: Array[Byte], chunkSize: Int) => val is: InputStream = new ByteArrayInputStream(bytes) - blockingExecutionContext.use { ec => - val stream = unsafeReadInputStream(IO(is), chunkSize, ec) + Blocker[IO].use { blocker => + val stream = unsafeReadInputStream(IO(is), chunkSize, blocker) stream.compile.toVector.asserting(_.toArray shouldBe bytes) } } diff --git a/io/src/test/scala/fs2/io/file/FileSpec.scala b/io/src/test/scala/fs2/io/file/FileSpec.scala index e6bfc0ae10..7e30495237 100644 --- a/io/src/test/scala/fs2/io/file/FileSpec.scala +++ b/io/src/test/scala/fs2/io/file/FileSpec.scala @@ -4,7 +4,7 @@ package file import java.nio.file.StandardOpenOption -import cats.effect.IO +import cats.effect.{Blocker, IO} import cats.implicits._ class FileSpec extends BaseFileSpec { @@ -12,7 +12,7 @@ class FileSpec extends BaseFileSpec { "readAll" - { "retrieves whole content of a file" in { Stream - .resource(blockingExecutionContext) + .resource(Blocker[IO]) .flatMap { bec => tempFile .flatTap(modify) @@ -28,7 +28,7 @@ class FileSpec extends BaseFileSpec { "readRange" - { "reads half of a file" in { Stream - .resource(blockingExecutionContext) + .resource(Blocker[IO]) .flatMap { bec => tempFile .flatTap(modify) @@ -42,7 +42,7 @@ class FileSpec extends BaseFileSpec { "reads full file if end is bigger than file size" in { Stream - .resource(blockingExecutionContext) + .resource(Blocker[IO]) .flatMap { bec => tempFile .flatTap(modify) @@ -58,7 +58,7 @@ class FileSpec extends BaseFileSpec { "writeAll" - { "simple write" in { Stream - .resource(blockingExecutionContext) + .resource(Blocker[IO]) .flatMap { bec => tempFile .flatMap( @@ -76,7 +76,7 @@ class FileSpec extends BaseFileSpec { "append" in { Stream - .resource(blockingExecutionContext) + .resource(Blocker[IO]) .flatMap { bec => tempFile .flatMap { path => diff --git a/io/src/test/scala/fs2/io/file/WatcherSpec.scala b/io/src/test/scala/fs2/io/file/WatcherSpec.scala index 6edec65b8a..136205547d 100644 --- a/io/src/test/scala/fs2/io/file/WatcherSpec.scala +++ b/io/src/test/scala/fs2/io/file/WatcherSpec.scala @@ -4,7 +4,7 @@ package file import scala.concurrent.duration._ -import cats.effect.IO +import cats.effect.{Blocker, IO} import cats.implicits._ import java.nio.file._ @@ -14,7 +14,7 @@ class WatcherSpec extends BaseFileSpec { "for modifications" in { tempFile .flatMap { f => - Stream.resource(blockingExecutionContext).flatMap { bec => + Stream.resource(Blocker[IO]).flatMap { bec => file .watch[IO](bec, f, modifiers = modifiers) .takeWhile({ @@ -30,7 +30,7 @@ class WatcherSpec extends BaseFileSpec { "for deletions" in { tempFile .flatMap { f => - Stream.resource(blockingExecutionContext).flatMap { bec => + Stream.resource(Blocker[IO]).flatMap { bec => file .watch[IO](bec, f, modifiers = modifiers) .takeWhile({ @@ -52,7 +52,7 @@ class WatcherSpec extends BaseFileSpec { val a = dir.resolve("a") val b = a.resolve("b") Stream.eval(IO(Files.createDirectory(a)) >> IO(Files.write(b, Array[Byte]()))) >> - Stream.resource(blockingExecutionContext).flatMap { bec => + Stream.resource(Blocker[IO]).flatMap { bec => file .watch[IO](bec, dir, modifiers = modifiers) .takeWhile({ @@ -70,7 +70,7 @@ class WatcherSpec extends BaseFileSpec { .flatMap { dir => val a = dir.resolve("a") val b = a.resolve("b") - Stream.resource(blockingExecutionContext).flatMap { bec => + Stream.resource(Blocker[IO]).flatMap { bec => file .watch[IO](bec, dir, modifiers = modifiers) .takeWhile({ diff --git a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala index ce626ba70b..dac982bc98 100644 --- a/io/src/test/scala/fs2/io/tcp/SocketSpec.scala +++ b/io/src/test/scala/fs2/io/tcp/SocketSpec.scala @@ -5,13 +5,13 @@ package tcp import java.net.InetSocketAddress import java.net.InetAddress -import cats.effect.IO +import cats.effect.{Blocker, IO} import cats.effect.concurrent.Deferred class SocketSpec extends Fs2Spec { def mkSocketGroup: Stream[IO, SocketGroup] = - Stream.resource(blockingExecutionContext.flatMap(ec => SocketGroup[IO](ec))) + Stream.resource(Blocker[IO].flatMap(blocker => SocketGroup[IO](blocker))) "tcp" - { diff --git a/io/src/test/scala/fs2/io/udp/UdpSpec.scala b/io/src/test/scala/fs2/io/udp/UdpSpec.scala index 2d46047291..f22e9a3b00 100644 --- a/io/src/test/scala/fs2/io/udp/UdpSpec.scala +++ b/io/src/test/scala/fs2/io/udp/UdpSpec.scala @@ -13,13 +13,13 @@ import java.nio.channels.InterruptedByTimeoutException import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import cats.effect.IO +import cats.effect.{Blocker, IO} import cats.implicits._ class UdpSpec extends Fs2Spec { def mkSocketGroup: Stream[IO, SocketGroup] = - Stream.resource(blockingExecutionContext.flatMap(ec => SocketGroup(ec))) + Stream.resource(Blocker[IO].flatMap(blocker => SocketGroup(blocker))) "udp" - { "echo one" in { From 1c0f76adee59d12bc39b4eb16a0270323f4eb58d Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 12 Jun 2019 13:46:10 -0400 Subject: [PATCH 2/3] Update docs/ReadmeExample.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thanks! Co-Authored-By: Bjørn Madsen --- docs/ReadmeExample.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ReadmeExample.md b/docs/ReadmeExample.md index 5e303423f2..5b76f13a61 100644 --- a/docs/ReadmeExample.md +++ b/docs/ReadmeExample.md @@ -59,7 +59,7 @@ implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionCo // upon finalization, like in the original example. // See the whole README example for proper resource management in terms of `Blocker`. val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) -val blocker: Blocker = Blocker.unsafeFromExecutionContext(blockingPool) +val blocker: Blocker = Blocker.liftExecutionContext(blockingPool) def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0) From f5af03e0ae10eb80dec5745a6c53e3436141888c Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 12 Jun 2019 13:47:05 -0400 Subject: [PATCH 3/3] Update docs/src/ReadmeExample.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Bjørn Madsen --- docs/src/ReadmeExample.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/ReadmeExample.md b/docs/src/ReadmeExample.md index fb10208f3b..00b8310f08 100644 --- a/docs/src/ReadmeExample.md +++ b/docs/src/ReadmeExample.md @@ -51,7 +51,7 @@ implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionCo // upon finalization, like in the original example. // See the whole README example for proper resource management in terms of `Blocker`. val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) -val blocker: Blocker = Blocker.unsafeFromExecutionContext(blockingPool) +val blocker: Blocker = Blocker.liftExecutionContext(blockingPool) def fahrenheitToCelsius(f: Double): Double = (f - 32.0) * (5.0/9.0)