diff --git a/core/build.sbt b/core/build.sbt index eeeec35..7a45bf0 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -4,7 +4,7 @@ resolvers ++= Seq( libraryDependencies ++= { Seq( - "co.fs2" %% "fs2-core" % "0.10.1", - "io.verizon.ermine" %% "parser" % "0.5.8" + "co.fs2" %% "fs2-core" % "1.0.0", + "io.verizon.ermine" %% "parser" % "0.5.9" ) } diff --git a/core/src/main/scala/knobs/BaseConfig.scala b/core/src/main/scala/knobs/BaseConfig.scala index 4f53366..616cc6c 100644 --- a/core/src/main/scala/knobs/BaseConfig.scala +++ b/core/src/main/scala/knobs/BaseConfig.scala @@ -17,9 +17,9 @@ package knobs import cats._ -import cats.effect.Effect +import cats.effect.ConcurrentEffect import cats.implicits._ -import fs2.async.Ref +import cats.effect.concurrent.Ref /** * Global configuration data. This is the top-level config from which @@ -41,12 +41,12 @@ case class BaseConfig[F[_]](paths: Ref[F, List[(Name, KnobsResource)]], def at(root: String)(implicit F: Functor[F]): F[Config] = cfgMap.get.map(Config(_).subconfig(root)) - def reload(implicit F: Effect[F]): F[Unit] = for { + def reload(implicit F: ConcurrentEffect[F]): F[Unit] = for { ps <- paths.get mp <- loadFiles(ps.map(_._2)).flatMap(flatten(ps, _)) - m <- cfgMap.modify2(m => (mp, m)) + m <- cfgMap.modify(m => (mp, m)) s <- subs.get - _ <- notifySubscribers(m._2, mp, s) + _ <- notifySubscribers(m, mp, s) } yield () } diff --git a/core/src/main/scala/knobs/MutableConfig.scala b/core/src/main/scala/knobs/MutableConfig.scala index 3a01be6..ff8165c 100644 --- a/core/src/main/scala/knobs/MutableConfig.scala +++ b/core/src/main/scala/knobs/MutableConfig.scala @@ -17,12 +17,11 @@ package knobs import cats._ -import cats.effect.{Effect, Sync} +import cats.effect.{Effect, Sync, Concurrent, ConcurrentEffect} import cats.implicits._ import fs2.Stream -import fs2.async.signalOf +import fs2.concurrent.SignallingRef -import scala.concurrent.ExecutionContext /** Mutable, reloadable, configuration data */ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) { @@ -41,23 +40,23 @@ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) { * just the local section. Any overridden properties set with `addProperties` * will disappear. */ - def reload(implicit F: Effect[F]): F[Unit] = base.reload + def reload(implicit F: ConcurrentEffect[F]): F[Unit] = base.reload /** * Add additional files to this `MutableConfig`, causing it to be reloaded to * add their contents. */ - def add(paths: List[KnobsResource])(implicit F: Effect[F]): F[Unit] = + def add(paths: List[KnobsResource])(implicit F: ConcurrentEffect[F]): F[Unit] = addGroups(paths.map(x => ("", x))) /** * Add additional files to named groups in this `MutableConfig`, causing it to be * reloaded to add their contents. */ - def addGroups(paths: List[(Name, KnobsResource)])(implicit F: Effect[F]): F[Unit] = { + def addGroups(paths: List[(Name, KnobsResource)])(implicit F: ConcurrentEffect[F]): F[Unit] = { def fix[A](p: (String, A)) = (addDot(p._1), p._2) for { - _ <- base.paths.modify(prev => prev ++ paths.map(fix)) + _ <- base.paths.update(prev => prev ++ paths.map(fix)) _ <- base.reload } yield () } @@ -68,11 +67,11 @@ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) { * will be lost. */ def addEnv(props: Env)(implicit F: Effect[F]): F[Unit] = for { - p <- base.cfgMap.modify2 { m => + p <- base.cfgMap.modify { m => val mp = m ++ props.map { case (k, v) => (root + k, v) } (mp, (m, mp)) } - (_, (m, mp)) = p + (m, mp) = p subs <- base.subs.get _ <- notifySubscribers(m, mp, subs) } yield () @@ -156,20 +155,20 @@ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) { * Subscribe to notifications. The given handler will be invoked when any * change occurs to a configuration property that matches the pattern. */ - def subscribe(p: Pattern, h: ChangeHandler[F])(implicit F: Apply[F]): F[Unit] = - base.subs.modify { map => + def subscribe(p: Pattern, h: ChangeHandler[F]): F[Unit] = + base.subs.update { map => map.get(p.local(root)) match { case None => map + ((p.local(root), List(h))) case Some(existing) => map + ((p.local(root), existing ++ List(h))) } - }.void + } /** * A process that produces chages to the configuration properties that match * the given pattern */ - def changes(p: Pattern)(implicit F: Effect[F], ec: ExecutionContext): Stream[F, (Name, Option[CfgValue])] = { - val signal = signalOf[F, (Name, Option[CfgValue])](("", None)) // TP: Not sure about the soundness of this default? + def changes(p: Pattern)(implicit F: Concurrent[F]): Stream[F, (Name, Option[CfgValue])] = { + val signal = SignallingRef[F, (Name, Option[CfgValue])](("", None)) // TP: Not sure about the soundness of this default? Stream.eval { for { diff --git a/core/src/main/scala/knobs/Resource.scala b/core/src/main/scala/knobs/Resource.scala index db0b856..8700ce3 100644 --- a/core/src/main/scala/knobs/Resource.scala +++ b/core/src/main/scala/knobs/Resource.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import cats.Show import cats.data.NonEmptyVector -import cats.effect.{Async, Effect, Sync} +import cats.effect.{Async, Sync, Concurrent} import cats.implicits._ import fs2.Stream @@ -50,7 +50,7 @@ trait Resource[R] extends Show[R] { } trait Watchable[R] extends Resource[R] { - def watch[F[_]](path: Worth[R])(implicit F: Effect[F]): F[(List[Directive], Stream[F, Unit])] + def watch[F[_]](path: Worth[R])(implicit F: Concurrent[F]): F[(List[Directive], Stream[F, Unit])] } /** An existential resource. Equivalent to the (Haskell-style) type `exists r. Resource r ⇒ r` */ @@ -93,7 +93,7 @@ object FileResource { * Optionally creates a process to watch changes to the file and * reload any `MutableConfig` if it has changed. */ - def apply(f: File, watched: Boolean = true)(implicit ec: ExecutionContext): ResourceBox = { + def apply(f: File, watched: Boolean = true): ResourceBox = { val _ = watched Watched(f.getCanonicalFile) } @@ -102,7 +102,7 @@ object FileResource { * Creates a new resource that loads a configuration from a file. * Does not watch the file for changes or reload the config automatically. */ - def unwatched(f: File)(implicit ec: ExecutionContext): ResourceBox = Resource.box(f.getCanonicalFile) + def unwatched(f: File): ResourceBox = Resource.box(f.getCanonicalFile) } object ClassPathResource { @@ -152,7 +152,7 @@ object Resource { private val watchService: WatchService = FileSystems.getDefault.newWatchService private def watchStream[F[_]](implicit F: Async[F]): Stream[F, WatchEvent[_]] = - Stream.eval(F.shift(watchPool) *> F.delay(watchService.take)).flatMap(s => + Stream.eval(Async.shift[F](watchPool) *> F.delay(watchService.take)).flatMap(s => Stream.emits(s.pollEvents.asScala)).repeat /** Construct a new boxed resource from a valid `Resource` */ @@ -222,7 +222,7 @@ object Resource { } yield watchStream.filter(_.context == f) } - implicit def fileResource(implicit ec: ExecutionContext): Watchable[File] = new Watchable[File] { + implicit def fileResource: Watchable[File] = new Watchable[File] { def resolve(r: File, child: String): File = new File(resolveName(r.getPath, child)) @@ -231,11 +231,11 @@ object Resource { override def show(r: File) = r.toString - def watch[F[_]](path: Worth[File])(implicit F: Effect[F]) = for { + def watch[F[_]](path: Worth[File])(implicit F: Concurrent[F]) = for { ds <- load(path) rs <- recursiveImports(path.worth, ds) es <- (path.worth :: rs).traverse(f => watchEvent(f.toPath)) - } yield (ds, Stream.emits(es.map(_.map(_ => ()))).joinUnbounded) + } yield (ds, Stream.emits(es.map(_.map(_ => ()))).parJoinUnbounded) } implicit def uriResource: Resource[URI] = new Resource[URI] { diff --git a/core/src/main/scala/knobs/package.scala b/core/src/main/scala/knobs/package.scala index e2c766e..7172cb5 100644 --- a/core/src/main/scala/knobs/package.scala +++ b/core/src/main/scala/knobs/package.scala @@ -17,8 +17,7 @@ import cats.effect._ import cats.implicits._ import fs2.Stream -import fs2.async.Ref -import scala.concurrent.ExecutionContext +import cats.effect.concurrent.Ref package object knobs { import Resource._ @@ -55,9 +54,8 @@ package object knobs { * first time they are opened, so you can specify a file name such as * `"$(HOME)/myapp.cfg"`. */ - def load[F[_]: Effect](files: List[KnobsResource], - pool: ExecutionContext = Resource.notificationPool): F[MutableConfig[F]] = - loadp(files.map(f => ("", f)), pool).map(MutableConfig("", _)) + def load[F[_]: ConcurrentEffect](files: List[KnobsResource]): F[MutableConfig[F]] = + loadp(files.map(f => ("", f))).map(MutableConfig("", _)) /** * Create an immutable `Config` from the contents of the named files. Throws an @@ -67,22 +65,20 @@ package object knobs { * first time they are opened, so you can specify a file name such as * `"$(HOME)/myapp.cfg"`. */ - def loadImmutable[F[_]: Effect](files: List[KnobsResource], - pool: ExecutionContext = Resource.notificationPool): F[Config] = + def loadImmutable[F[_]: ConcurrentEffect](files: List[KnobsResource]): F[Config] = load(files.map(_.map { case WatchBox(b, w) => ResourceBox(b)(w) case x => x - }), pool).flatMap(_.immutable) + })).flatMap(_.immutable) /** * Create a `MutableConfig` from the contents of the named files, placing them * into named prefixes. */ - def loadGroups[F[_]: Effect](files: List[(Name, KnobsResource)], - pool: ExecutionContext = Resource.notificationPool): F[MutableConfig[F]] = - loadp(files, pool).map(MutableConfig("", _)) + def loadGroups[F[_]: ConcurrentEffect](files: List[(Name, KnobsResource)]): F[MutableConfig[F]] = + loadp(files).map(MutableConfig("", _)) - private [knobs] def loadFiles[F[_]: Effect](paths: List[KnobsResource]): F[Loaded[F]] = { + private [knobs] def loadFiles[F[_]: ConcurrentEffect](paths: List[KnobsResource]): F[Loaded[F]] = { def go(seen: Loaded[F], path: KnobsResource): F[Loaded[F]] = { def notSeen(n: KnobsResource): Boolean = seen.get(n).isEmpty for { @@ -98,16 +94,15 @@ package object knobs { } private [knobs] def loadp[F[_]]( - paths: List[(Name, KnobsResource)], - pool: ExecutionContext = Resource.notificationPool)(implicit F: Effect[F]): F[BaseConfig[F]] = { - implicit val implicitPool = pool + paths: List[(Name, KnobsResource)] + )(implicit F: ConcurrentEffect[F]): F[BaseConfig[F]] = { for { loaded <- loadFiles(paths.map(_._2)) - p <- Ref(paths) - m <- flatten(paths, loaded).flatMap(Ref(_)) - s <- Ref(Map[Pattern, List[ChangeHandler[F]]]()) + p <- Ref[F].of(paths) + m <- flatten(paths, loaded).flatMap(Ref[F].of(_)) + s <- Ref[F].of(Map[Pattern, List[ChangeHandler[F]]]()) bc = BaseConfig(paths = p, cfgMap = m, subs = s) - ticks = Stream.emits(loaded.values.map(_._2).toSeq).joinUnbounded + ticks = Stream.emits(loaded.values.map(_._2).toSeq).parJoinUnbounded _ <- F.delay(F.runAsync(ticks.evalMap(_ => bc.reload).compile.drain)(_ => IO.unit).unsafeRunSync) } yield bc } @@ -222,7 +217,7 @@ package object knobs { implicitly[Resource[R]].load(Required(r)).flatMap(dds => recursiveImports(r, dds)))) - private [knobs] def loadOne[F[_]: Effect](path: KnobsResource): F[(List[Directive], Stream[F, Unit])] = { + private [knobs] def loadOne[F[_]: ConcurrentEffect](path: KnobsResource): F[(List[Directive], Stream[F, Unit])] = { val box = path.worth val r: box.R = box.resource box.watchable match { diff --git a/core/src/test/scala/knobs/FileWatcherTest.scala b/core/src/test/scala/knobs/FileWatcherTest.scala index 00800fb..415ac9c 100644 --- a/core/src/test/scala/knobs/FileWatcherTest.scala +++ b/core/src/test/scala/knobs/FileWatcherTest.scala @@ -16,28 +16,32 @@ //: ---------------------------------------------------------------------------- package knobs -import fs2.async.Ref -import java.nio.file.{ Files, Paths } + +import cats.effect.concurrent._ +import java.nio.file.{Files, Paths} import java.util.concurrent.CountDownLatch + import org.scalacheck._ import org.scalacheck.Prop._ -import scala.concurrent.ExecutionContext.Implicits.global - import cats.effect.IO +import scala.concurrent.ExecutionContext + object FileWatcherTests extends Properties("FileWatch") { + implicit val cs = IO.contextShift(ExecutionContext.global) + property("file watch") = { val mutantUri = Thread.currentThread.getContextClassLoader.getResource("mutant.cfg").toURI val mutantPath = Paths.get(mutantUri) val latch = new CountDownLatch(1) val prg = for { - ref <- Ref[IO, String]("") + ref <- Ref.of[IO, String]("") _ <- IO(Files.write(mutantPath, "foo = \"bletch\"\n".getBytes)) cfg <- load[IO](List(Required(FileResource(mutantPath.toFile)))) _ <- cfg.subscribe(Exact("foo"), { case ("foo", Some(t: CfgText)) => - ref.setSync(t.pretty).flatMap(_ => IO(latch.countDown)) + ref.set(t.pretty).flatMap(_ => IO(latch.countDown)) case _ => { IO(latch.countDown) } diff --git a/core/src/test/scala/knobs/Test.scala b/core/src/test/scala/knobs/Test.scala index df57cef..5769461 100644 --- a/core/src/test/scala/knobs/Test.scala +++ b/core/src/test/scala/knobs/Test.scala @@ -18,11 +18,14 @@ package knobs import org.scalacheck._ import Prop._ -import scala.concurrent.duration._ +import scala.concurrent.duration._ import cats.effect.IO import cats.implicits._ +import scala.concurrent.ExecutionContext + + object Test extends Properties("Knobs") { // TODO remove when available in Scalacheck @@ -30,6 +33,8 @@ object Test extends Properties("Knobs") { private implicit val arbFiniteDuration: Arbitrary[FiniteDuration] = Arbitrary( Gen.chooseNum(Long.MinValue + 1, Long.MaxValue).map(Duration.fromNanos)) + implicit val cs = IO.contextShift(ExecutionContext.global) + def withLoad[A](files: List[KnobsResource])( t: MutableConfig[IO] => IO[A]): IO[A] = for { mb <- load[IO](files) diff --git a/typesafe/build.sbt b/typesafe/build.sbt index e4ce45b..c1f3c5c 100644 --- a/typesafe/build.sbt +++ b/typesafe/build.sbt @@ -1,2 +1,2 @@ // 1.3.0 is out but it depends on java 1.8. -libraryDependencies += "com.typesafe" % "config" % "1.2.1" +libraryDependencies += "com.typesafe" % "config" % "1.3.3" diff --git a/version.sbt b/version.sbt index c0c0702..b8b4128 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "6.0.0-SNAPSHOT" +version in ThisBuild := "7.0.0-SNAPSHOT" diff --git a/zookeeper/build.sbt b/zookeeper/build.sbt index 615f38f..1743371 100644 --- a/zookeeper/build.sbt +++ b/zookeeper/build.sbt @@ -1,4 +1,4 @@ libraryDependencies ++= Seq( - "org.apache.curator" % "curator-framework" % "2.8.0", - "org.apache.curator" % "curator-test" % "2.8.0" % "test" + "org.apache.curator" % "curator-framework" % "4.0.1", + "org.apache.curator" % "curator-test" % "4.0.1" % "test" ) diff --git a/zookeeper/src/main/scala/knobs/Zookeeper.scala b/zookeeper/src/main/scala/knobs/Zookeeper.scala index fa3464c..4c8bab9 100644 --- a/zookeeper/src/main/scala/knobs/Zookeeper.scala +++ b/zookeeper/src/main/scala/knobs/Zookeeper.scala @@ -22,12 +22,11 @@ import org.apache.curator.retry._ import org.apache.zookeeper.WatchedEvent import org.apache.zookeeper.Watcher.Event.EventType._ -import scala.concurrent.ExecutionContext - -import cats.effect.{Async, Effect, Sync} +import cats.effect.{Async, Sync, ConcurrentEffect, Concurrent} import cats.implicits._ import fs2.Stream + /** * A ZNode contains a `path` to a node in the ZooKeeper tree * provided by the given `client`. The `client` is assumed to be started. @@ -56,7 +55,7 @@ object ZooKeeper { } catch { case e: Exception => k(Left(e)) } }).repeat - implicit def zkResource(implicit ec: ExecutionContext): Watchable[ZNode] = new Watchable[ZNode] { + implicit def zkResource: Watchable[ZNode] = new Watchable[ZNode] { def resolve(r: ZNode, child: Path): ZNode = r.copy(path = Resource.resolveName(r.path, child)) def load[F[_]](node: Worth[ZNode])(implicit F: Sync[F]) = { @@ -65,18 +64,18 @@ object ZooKeeper { new String(c.getData.forPath(path).map(_.toChar)) }) } - def watch[F[_]](node: Worth[ZNode])(implicit F: Effect[F]) = for { + def watch[F[_]](node: Worth[ZNode])(implicit F: Concurrent[F]) = for { ds <- load(node) rs <- recursiveImports(node.worth, ds) reloads <- F.delay { Stream.emits(node.worth +: rs).map { case ZNode(c, path) => watchEvent(c, path).map(_ => ()) }} - } yield (ds, reloads.joinUnbounded) + } yield (ds, reloads.parJoinUnbounded) def show(t: ZNode): String = t.toString } - private def doZK[F[_]](config: List[KnobsResource])(implicit F: Effect[F], ec: ExecutionContext): F[(ResourceBox, CuratorFramework)] = { + private def doZK[F[_]](config: List[KnobsResource])(implicit F: ConcurrentEffect[F]): F[(ResourceBox, CuratorFramework)] = { val retryPolicy = new ExponentialBackoffRetry(1000, 3) @@ -113,7 +112,7 @@ object ZooKeeper { * } yield () }.run * ``` */ - def withDefault[F[_]](k: ResourceBox => F[Unit])(implicit F: Effect[F], ec: ExecutionContext): F[Unit] = safe(k) + def withDefault[F[_]](k: ResourceBox => F[Unit])(implicit F: ConcurrentEffect[F]): F[Unit] = safe(k) /** * IO-based API. Works just like `withDefault` except it loads configuration from @@ -130,9 +129,9 @@ object ZooKeeper { * } yield () }.run * ``` */ - def fromResource[F[_]](customConfig: List[KnobsResource])(k: ResourceBox => F[Unit])(implicit F: Effect[F], ec: ExecutionContext): F[Unit] = safe(k, customConfig) + def fromResource[F[_]](customConfig: List[KnobsResource])(k: ResourceBox => F[Unit])(implicit F: ConcurrentEffect[F]): F[Unit] = safe(k, customConfig) - protected def safe[F[_]](k: ResourceBox => F[Unit], config: List[KnobsResource] = null)(implicit F: Effect[F], ec: ExecutionContext): F[Unit] = for { + protected def safe[F[_]](k: ResourceBox => F[Unit], config: List[KnobsResource] = null)(implicit F: ConcurrentEffect[F]): F[Unit] = for { p <- doZK(config) (box, c) = p _ <- k(box) diff --git a/zookeeper/src/test/scala/knobs/ZookeeperTest.scala b/zookeeper/src/test/scala/knobs/ZookeeperTest.scala index 6654eb7..bbc319d 100644 --- a/zookeeper/src/test/scala/knobs/ZookeeperTest.scala +++ b/zookeeper/src/test/scala/knobs/ZookeeperTest.scala @@ -18,19 +18,23 @@ package knobs import Resource._ import cats.effect.IO -import fs2.async.Ref +import cats.effect.concurrent.Ref import java.util.concurrent.CountDownLatch + import org.apache.curator.framework._ import org.apache.curator.retry._ import org.apache.curator.test._ import org.scalacheck.Prop._ import org.scalacheck._ -import scala.concurrent.ExecutionContext.Implicits.global + +import scala.concurrent.ExecutionContext object ZooKeeperTests extends Properties("ZooKeeper") { val retryPolicy = new ExponentialBackoffRetry(1000, 3) + implicit val cs = IO.contextShift(ExecutionContext.global) + import ZooKeeper._ property("load config") = { @@ -56,12 +60,12 @@ object ZooKeeperTests extends Properties("ZooKeeper") { c.create.forPath("/knobs.cfg", "foo = 10\n".toArray.map(_.toByte)) val latch = new CountDownLatch(1) val prg = for { - ref <- Ref[IO, Int](0) + ref <- Ref.of[IO, Int](0) cfg <- load[IO](List(Required(Watched(ZNode(c, "/knobs.cfg"))))) n1 <- cfg.require[Int]("foo") _ <- cfg.subscribe(Exact("foo"), { case ("foo", Some(CfgNumber(n))) => - ref.setSync(n.toInt).flatMap(_ => IO(latch.countDown)) + ref.set(n.toInt).flatMap(_ => IO(latch.countDown)) case _ => IO(latch.countDown) }) _ <- IO(Thread.sleep(1000))