Skip to content

Commit

Permalink
Merge pull request #1508 from mpilquist/topic/unblock-plus-blocker
Browse files Browse the repository at this point in the history
Replaced blocking ExecutionContext with Blocker
  • Loading branch information
mpilquist authored Jun 12, 2019
2 parents f2739af + f5af03e commit c998d15
Show file tree
Hide file tree
Showing 18 changed files with 175 additions and 266 deletions.
12 changes: 4 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,25 @@ libraryDependencies += "co.fs2" %%% "fs2-core" % "1.0.4"
FS2 is a streaming I/O library. The design goals are compositionality, expressiveness, resource safety, and speed. Here's a simple example of its use:

```scala
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

object Converter extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown()))

val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
}

def run(args: List[String]): IO[ExitCode] =
Expand Down
19 changes: 0 additions & 19 deletions core/jvm/src/test/scala/fs2/TestPlatform.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,5 @@
package fs2

import scala.concurrent.ExecutionContext

import cats.effect.{IO, Resource}
import cats.implicits._

import java.util.concurrent.Executors

import fs2.internal.ThreadFactories

trait TestPlatform {

def isJVM: Boolean = true

val blockingExecutionContext: Resource[IO, ExecutionContext] =
Resource
.make(
IO(ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(ThreadFactories.named("fs2-blocking", true)))))(ec =>
IO(ec.shutdown()))
.widen[ExecutionContext]

}
27 changes: 11 additions & 16 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import fs2.internal.{Resource => _, _}
import java.io.PrintStream

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -1572,7 +1571,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* Writes this stream of strings to the supplied `PrintStream`.
*
* Note: printing to the `PrintStream` is performed *synchronously*.
* Use `linesAsync(out, blockingEc)` if synchronous writes are a concern.
* Use `linesAsync(out, blocker)` if synchronous writes are a concern.
*/
def lines[F2[x] >: F[x]](out: PrintStream)(implicit F: Sync[F2],
ev: O <:< String): Stream[F2, Unit] = {
Expand All @@ -1586,13 +1585,13 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*
* Note: printing to the `PrintStream` is performed on the supplied blocking execution context.
*/
def linesAsync[F2[x] >: F[x]](out: PrintStream, blockingExecutionContext: ExecutionContext)(
def linesAsync[F2[x] >: F[x]](out: PrintStream, blocker: Blocker)(
implicit F: Sync[F2],
cs: ContextShift[F2],
ev: O <:< String): Stream[F2, Unit] = {
val _ = ev
val src = this.asInstanceOf[Stream[F2, String]]
src.evalMap(str => cs.evalOn(blockingExecutionContext)(F.delay(out.println(str))))
src.evalMap(str => blocker.delay(out.println(str)))
}

/**
Expand Down Expand Up @@ -2346,7 +2345,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* Writes this stream to the supplied `PrintStream`, converting each element to a `String` via `Show`.
*
* Note: printing to the `PrintStream` is performed *synchronously*.
* Use `showLinesAsync(out, blockingEc)` if synchronous writes are a concern.
* Use `showLinesAsync(out, blocker)` if synchronous writes are a concern.
*/
def showLines[F2[x] >: F[x], O2 >: O](out: PrintStream)(implicit F: Sync[F2],
showO: Show[O2]): Stream[F2, Unit] =
Expand All @@ -2357,12 +2356,10 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*
* Note: printing to the `PrintStream` is performed on the supplied blocking execution context.
*/
def showLinesAsync[F2[x] >: F[x], O2 >: O](out: PrintStream,
blockingExecutionContext: ExecutionContext)(
implicit F: Sync[F2],
cs: ContextShift[F2],
showO: Show[O2]): Stream[F2, Unit] =
covaryAll[F2, O2].map(_.show).linesAsync(out, blockingExecutionContext)
def showLinesAsync[F2[x] >: F[x]: Sync: ContextShift, O2 >: O: Show](
out: PrintStream,
blocker: Blocker): Stream[F2, Unit] =
covaryAll[F2, O2].map(_.show).linesAsync(out, blocker)

/**
* Writes this stream to standard out, converting each element to a `String` via `Show`.
Expand All @@ -2379,11 +2376,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*
* Note: printing to the `PrintStream` is performed on the supplied blocking execution context.
*/
def showLinesStdOutAsync[F2[x] >: F[x], O2 >: O](blockingExecutionContext: ExecutionContext)(
implicit F: Sync[F2],
cs: ContextShift[F2],
showO: Show[O2]): Stream[F2, Unit] =
showLinesAsync[F2, O2](Console.out, blockingExecutionContext)
def showLinesStdOutAsync[F2[x] >: F[x]: Sync: ContextShift, O2 >: O: Show](
blocker: Blocker): Stream[F2, Unit] =
showLinesAsync[F2, O2](Console.out, blocker)

/**
* Groups inputs in fixed size chunks by passing a "sliding window"
Expand Down
37 changes: 16 additions & 21 deletions docs/ReadmeExample.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
This walks through the implementation of the example given in [the README](../README.md). This program opens a file, `fahrenheit.txt`, containing temperatures in degrees fahrenheit, one per line, and converts each temperature to celsius, incrementally writing to the file `celsius.txt`. Both files will be closed, regardless of whether any errors occur.

```scala
import cats.effect.{ExitCode, IO, IOApp, Resource}
// import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
// import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}

import cats.implicits._
// import cats.implicits._
Expand All @@ -15,28 +15,20 @@ import fs2.{io, text, Stream}
import java.nio.file.Paths
// import java.nio.file.Paths

import java.util.concurrent.Executors
// import java.util.concurrent.Executors

import scala.concurrent.ExecutionContext
// import scala.concurrent.ExecutionContext

object Converter extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown()))

val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
}

def run(args: List[String]): IO[ExitCode] =
Expand All @@ -54,17 +46,20 @@ Operations on `Stream` are defined for any choice of type constructor, not just
`fs2.io` has a number of helper functions for constructing or working with streams that talk to the outside world. `readAll` creates a stream of bytes from a file name (specified via a `java.nio.file.Path`). It encapsulates the logic for opening and closing the file, so that users of this stream do not need to remember to close the file when they are done or in the event of exceptions during processing of the stream.

```scala
import cats.effect.{ContextShift, IO}
import cats.effect.{Blocker, ContextShift, IO}
import fs2.{io, text}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)

//note: this should be shut down when it's no longer necessary - normally that's at the end of your app.
//See the whole README example for proper resource management in terms of ExecutionContexts.
val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
// Note: to make these examples work in docs, we create a `Blocker` manually here but in real code,
// we should always use `Blocker[IO]`, which returns the blocker as a resource that shuts down the pool
// upon finalization, like in the original example.
// See the whole README example for proper resource management in terms of `Blocker`.
val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
val blocker: Blocker = Blocker.liftExecutionContext(blockingPool)

def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
Expand All @@ -75,7 +70,7 @@ scala> import fs2.Stream
import fs2.Stream

scala> val src: Stream[IO, Byte] =
| io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096)
| io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
src: fs2.Stream[cats.effect.IO,Byte] = Stream(..)
```

Expand Down Expand Up @@ -118,7 +113,7 @@ encodedBytes: fs2.Stream[cats.effect.IO,Byte] = Stream(..)
We then write the encoded bytes to a file. Note that nothing has happened at this point -- we are just constructing a description of a computation that, when interpreted, will incrementally consume the stream, sending converted values to the specified file.

```scala
scala> val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext))
scala> val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
written: fs2.Stream[cats.effect.IO,Unit] = Stream(..)
```

Expand All @@ -131,8 +126,8 @@ task: cats.effect.IO[Unit] = <function1>

We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. In this example, we extended `IOApp`, which lets us express our overall program as an `IO[ExitCase]`. The `IOApp` class handles running the task and hooking it up to the application entry point.

Let's shut down the ExecutionContext that we allocated earlier.
Let's shut down the thread pool that we allocated earlier -- reminder: in real code, we would not manually control the lifecycle of the blocking thread pool -- we'd use the resource returned from `Blocker[IO]` to manage it automatically, like in the full example we started with.

```scala
scala> blockingExecutionContext.shutdown()
scala> blockingPool.shutdown()
```
31 changes: 15 additions & 16 deletions docs/src/ReadmeExample.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,25 @@
This walks through the implementation of the example given in [the README](../README.md). This program opens a file, `fahrenheit.txt`, containing temperatures in degrees fahrenheit, one per line, and converts each temperature to celsius, incrementally writing to the file `celsius.txt`. Both files will be closed, regardless of whether any errors occur.

```tut:book
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Converter extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
}
def run(args: List[String]): IO[ExitCode] =
Expand All @@ -42,17 +38,20 @@ Operations on `Stream` are defined for any choice of type constructor, not just
`fs2.io` has a number of helper functions for constructing or working with streams that talk to the outside world. `readAll` creates a stream of bytes from a file name (specified via a `java.nio.file.Path`). It encapsulates the logic for opening and closing the file, so that users of this stream do not need to remember to close the file when they are done or in the event of exceptions during processing of the stream.

```tut:silent
import cats.effect.{ContextShift, IO}
import cats.effect.{Blocker, ContextShift, IO}
import fs2.{io, text}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
//note: this should be shut down when it's no longer necessary - normally that's at the end of your app.
//See the whole README example for proper resource management in terms of ExecutionContexts.
val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
// Note: to make these examples work in docs, we create a `Blocker` manually here but in real code,
// we should always use `Blocker[IO]`, which returns the blocker as a resource that shuts down the pool
// upon finalization, like in the original example.
// See the whole README example for proper resource management in terms of `Blocker`.
val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
val blocker: Blocker = Blocker.liftExecutionContext(blockingPool)
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
Expand All @@ -62,7 +61,7 @@ def fahrenheitToCelsius(f: Double): Double =
import fs2.Stream
val src: Stream[IO, Byte] =
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
```

A stream can be attached to a pipe, allowing for stateful transformations of the input values. Here, we attach the source stream to the `text.utf8Decode` pipe, which converts the stream of bytes to a stream of strings. We then attach the result to the `text.lines` pipe, which buffers strings and emits full lines. Pipes are expressed using the type `Pipe[F,I,O]`, which describes a pipe that can accept input values of type `I` and can output values of type `O`, potentially evaluating an effect periodically.
Expand Down Expand Up @@ -97,7 +96,7 @@ val encodedBytes: Stream[IO, Byte] = withNewlines.through(text.utf8Encode)
We then write the encoded bytes to a file. Note that nothing has happened at this point -- we are just constructing a description of a computation that, when interpreted, will incrementally consume the stream, sending converted values to the specified file.

```tut
val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext))
val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
```

There are a number of ways of interpreting the stream. In this case, we call `compile.drain`, which returns a val value of the effect type, `IO`. The output of the stream is ignored - we compile it solely for its effect.
Expand All @@ -108,8 +107,8 @@ val task: IO[Unit] = written.compile.drain

We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. In this example, we extended `IOApp`, which lets us express our overall program as an `IO[ExitCase]`. The `IOApp` class handles running the task and hooking it up to the application entry point.

Let's shut down the ExecutionContext that we allocated earlier.
Let's shut down the thread pool that we allocated earlier -- reminder: in real code, we would not manually control the lifecycle of the blocking thread pool -- we'd use the resource returned from `Blocker[IO]` to manage it automatically, like in the full example we started with.

```tut
blockingExecutionContext.shutdown()
blockingPool.shutdown()
```
30 changes: 12 additions & 18 deletions io/src/main/scala/fs2/io/file/FileHandle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package fs2
package io
package file

import scala.concurrent.ExecutionContext

import java.nio.ByteBuffer
import java.nio.channels.{FileChannel, FileLock}

import cats.effect.{ContextShift, Sync}
import cats.effect.{Blocker, ContextShift, Sync}

/**
* Provides the ability to read/write/lock/inspect a file in the effect `F`.
Expand Down Expand Up @@ -95,27 +93,23 @@ private[file] object FileHandle {
/**
* Creates a `FileHandle[F]` from a `java.nio.channels.FileChannel`.
*/
private[file] def fromFileChannel[F[_]](chan: FileChannel,
blockingExecutionContext: ExecutionContext)(
private[file] def fromFileChannel[F[_]](chan: FileChannel, blocker: Blocker)(
implicit F: Sync[F],
cs: ContextShift[F]): FileHandle[F] =
new FileHandle[F] {
type Lock = FileLock

private def doBlocking[A](a: => A): F[A] =
blockingDelay(blockingExecutionContext)(a)

override def force(metaData: Boolean): F[Unit] =
doBlocking(chan.force(metaData))
blocker.delay(chan.force(metaData))

override def lock: F[Lock] =
doBlocking(chan.lock)
blocker.delay(chan.lock)

override def lock(position: Long, size: Long, shared: Boolean): F[Lock] =
doBlocking(chan.lock(position, size, shared))
blocker.delay(chan.lock(position, size, shared))

override def read(numBytes: Int, offset: Long): F[Option[Chunk[Byte]]] =
doBlocking {
blocker.delay {
val buf = ByteBuffer.allocate(numBytes)
val len = chan.read(buf, offset)
if (len < 0) None
Expand All @@ -124,21 +118,21 @@ private[file] object FileHandle {
}

override def size: F[Long] =
doBlocking(chan.size)
blocker.delay(chan.size)

override def truncate(size: Long): F[Unit] =
doBlocking { chan.truncate(size); () }
blocker.delay { chan.truncate(size); () }

override def tryLock: F[Option[Lock]] =
doBlocking(Option(chan.tryLock()))
blocker.delay(Option(chan.tryLock()))

override def tryLock(position: Long, size: Long, shared: Boolean): F[Option[Lock]] =
doBlocking(Option(chan.tryLock(position, size, shared)))
blocker.delay(Option(chan.tryLock(position, size, shared)))

override def unlock(f: Lock): F[Unit] =
doBlocking(f.release())
blocker.delay(f.release())

override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
doBlocking(chan.write(bytes.toBytes.toByteBuffer, offset))
blocker.delay(chan.write(bytes.toBytes.toByteBuffer, offset))
}
}
Loading

0 comments on commit c998d15

Please sign in to comment.