Skip to content

Commit

Permalink
Merge pull request #3394 from mpilquist/topic/walk-drop-eager-optimiz…
Browse files Browse the repository at this point in the history
…ation

Remove walkEager optimization
  • Loading branch information
mpilquist committed Mar 1, 2024
2 parents e7bf3ce + b763c99 commit 52b9e5d
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 180 deletions.
52 changes: 50 additions & 2 deletions io/js/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package fs2
package io
package file

import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.Traverse
import cats.effect.kernel.{Async, Resource}
import cats.syntax.all._
import fs2.io.file.Files.UnsealedFiles
import fs2.io.internal.facade
Expand Down Expand Up @@ -369,6 +369,54 @@ private[fs2] trait FilesCompanionPlatform {
override def size(path: Path): F[Long] =
stat(path).map(_.size.toString.toLong)

override def walkWithAttributes(start: Path, options: WalkOptions): Stream[F, PathInfo] = {

def go(
start: Path,
maxDepth: Int,
ancestry: List[Either[Path, FileKey]]
): Stream[F, PathInfo] =
Stream.eval(getBasicFileAttributes(start, followLinks = false)).mask.flatMap { attr =>
Stream.emit(PathInfo(start, attr)) ++ {
if (maxDepth == 0) Stream.empty
else if (attr.isDirectory)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}
else if (attr.isSymbolicLink && options.followLinks)
Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap { attr =>
val fileKey = attr.fileKey
val isCycle = Traverse[List].existsM(ancestry) {
case Right(ancestorKey) => F.pure(fileKey.contains(ancestorKey))
case Left(ancestorPath) => isSameFile(start, ancestorPath)
}

Stream.eval(isCycle).flatMap { isCycle =>
if (!isCycle)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}
else if (options.allowCycles)
Stream.empty
else
Stream.raiseError(new FileSystemLoopException(start.toString))
}

}
else
Stream.empty
}
}

go(
start,
options.maxDepth,
Nil
)
.chunkN(options.chunkSize)
.flatMap(Stream.chunk)
}

override def writeAll(path: Path, _flags: Flags): Pipe[F, Byte, Nothing] =
in =>
in.through {
Expand Down
52 changes: 2 additions & 50 deletions io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ private[file] trait FilesCompanionPlatform {
private case class NioFileKey(value: AnyRef) extends FileKey

private final class AsyncFiles[F[_]](protected implicit val F: Async[F])
extends Files.UnsealedFiles[F]
with AsyncFilesPlatform[F] {
extends Files.UnsealedFiles[F] {

def copy(source: Path, target: Path, flags: CopyFlags): F[Unit] =
Sync[F].blocking {
Expand Down Expand Up @@ -391,61 +390,14 @@ private[file] trait FilesCompanionPlatform {
.resource(Resource.fromAutoCloseable(javaCollection))
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))

protected def walkEager(start: Path, options: WalkOptions): Stream[F, PathInfo] = {
val doWalk = Sync[F].interruptible {
val bldr = Vector.newBuilder[PathInfo]
JFiles.walkFileTree(
start.toNioPath,
if (options.followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
options.maxDepth,
new SimpleFileVisitor[JPath] {
private def enqueue(path: JPath, attrs: JBasicFileAttributes): FileVisitResult = {
bldr += PathInfo(Path.fromNioPath(path), new DelegatingBasicFileAttributes(attrs))
FileVisitResult.CONTINUE
}

override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(file, attrs)

override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
t match {
case _: FileSystemLoopException =>
if (options.allowCycles)
enqueue(
file,
JFiles.readAttributes(
file,
classOf[JBasicFileAttributes],
LinkOption.NOFOLLOW_LINKS
)
)
else throw t
case _ => FileVisitResult.CONTINUE
}

override def preVisitDirectory(
dir: JPath,
attrs: JBasicFileAttributes
): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(dir, attrs)

override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else FileVisitResult.CONTINUE
}
)
Chunk.from(bldr.result())
}
Stream.eval(doWalk).flatMap(Stream.chunk)
}

private case class WalkEntry(
path: Path,
attr: JBasicFileAttributes,
depth: Int,
ancestry: List[Either[Path, NioFileKey]]
)

protected def walkJustInTime(
override def walkWithAttributes(
start: Path,
options: WalkOptions
): Stream[F, PathInfo] = {
Expand Down
41 changes: 0 additions & 41 deletions io/jvm/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala

This file was deleted.

38 changes: 0 additions & 38 deletions io/native/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala

This file was deleted.

49 changes: 0 additions & 49 deletions io/shared/src/main/scala/fs2/io/file/Files.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import cats.effect.std.Hotswap
import cats.syntax.all._

import scala.concurrent.duration._
import cats.Traverse

/** Provides operations related to working with files in the effect `F`.
*
Expand Down Expand Up @@ -525,54 +524,6 @@ object Files extends FilesCompanionPlatform with FilesLowPriority {
case _: NoSuchFileException => ()
})

def walkWithAttributes(start: Path, options: WalkOptions): Stream[F, PathInfo] = {

def go(
start: Path,
maxDepth: Int,
ancestry: List[Either[Path, FileKey]]
): Stream[F, PathInfo] =
Stream.eval(getBasicFileAttributes(start, followLinks = false)).mask.flatMap { attr =>
Stream.emit(PathInfo(start, attr)) ++ {
if (maxDepth == 0) Stream.empty
else if (attr.isDirectory)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}
else if (attr.isSymbolicLink && options.followLinks)
Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap { attr =>
val fileKey = attr.fileKey
val isCycle = Traverse[List].existsM(ancestry) {
case Right(ancestorKey) => F.pure(fileKey.contains(ancestorKey))
case Left(ancestorPath) => isSameFile(start, ancestorPath)
}

Stream.eval(isCycle).flatMap { isCycle =>
if (!isCycle)
list(start).mask.flatMap { path =>
go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry)
}
else if (options.allowCycles)
Stream.empty
else
Stream.raiseError(new FileSystemLoopException(start.toString))
}

}
else
Stream.empty
}
}

go(
start,
options.maxDepth,
Nil
)
.chunkN(options.chunkSize)
.flatMap(Stream.chunk)
}

def writeAll(
path: Path,
flags: Flags
Expand Down

0 comments on commit 52b9e5d

Please sign in to comment.