Skip to content

Commit

Permalink
Merge pull request Verizon#64 from dbousamra/chore/upgrade-fs2-cats-e…
Browse files Browse the repository at this point in the history
…ffect

fs2-core-1.0.0
  • Loading branch information
rossabaker authored Oct 9, 2018
2 parents 1481cb3 + 69db524 commit 93856e4
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 74 deletions.
4 changes: 2 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ resolvers ++= Seq(

libraryDependencies ++= {
Seq(
"co.fs2" %% "fs2-core" % "0.10.1",
"io.verizon.ermine" %% "parser" % "0.5.8"
"co.fs2" %% "fs2-core" % "1.0.0",
"io.verizon.ermine" %% "parser" % "0.5.9"
)
}
10 changes: 5 additions & 5 deletions core/src/main/scala/knobs/BaseConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package knobs

import cats._
import cats.effect.Effect
import cats.effect.ConcurrentEffect
import cats.implicits._
import fs2.async.Ref
import cats.effect.concurrent.Ref

/**
* Global configuration data. This is the top-level config from which
Expand All @@ -41,12 +41,12 @@ case class BaseConfig[F[_]](paths: Ref[F, List[(Name, KnobsResource)]],
def at(root: String)(implicit F: Functor[F]): F[Config] =
cfgMap.get.map(Config(_).subconfig(root))

def reload(implicit F: Effect[F]): F[Unit] = for {
def reload(implicit F: ConcurrentEffect[F]): F[Unit] = for {
ps <- paths.get
mp <- loadFiles(ps.map(_._2)).flatMap(flatten(ps, _))
m <- cfgMap.modify2(m => (mp, m))
m <- cfgMap.modify(m => (mp, m))
s <- subs.get
_ <- notifySubscribers(m._2, mp, s)
_ <- notifySubscribers(m, mp, s)
} yield ()
}

Expand Down
27 changes: 13 additions & 14 deletions core/src/main/scala/knobs/MutableConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package knobs

import cats._
import cats.effect.{Effect, Sync}
import cats.effect.{Effect, Sync, Concurrent, ConcurrentEffect}
import cats.implicits._
import fs2.Stream
import fs2.async.signalOf
import fs2.concurrent.SignallingRef

import scala.concurrent.ExecutionContext

/** Mutable, reloadable, configuration data */
case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) {
Expand All @@ -41,23 +40,23 @@ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) {
* just the local section. Any overridden properties set with `addProperties`
* will disappear.
*/
def reload(implicit F: Effect[F]): F[Unit] = base.reload
def reload(implicit F: ConcurrentEffect[F]): F[Unit] = base.reload

/**
* Add additional files to this `MutableConfig`, causing it to be reloaded to
* add their contents.
*/
def add(paths: List[KnobsResource])(implicit F: Effect[F]): F[Unit] =
def add(paths: List[KnobsResource])(implicit F: ConcurrentEffect[F]): F[Unit] =
addGroups(paths.map(x => ("", x)))

/**
* Add additional files to named groups in this `MutableConfig`, causing it to be
* reloaded to add their contents.
*/
def addGroups(paths: List[(Name, KnobsResource)])(implicit F: Effect[F]): F[Unit] = {
def addGroups(paths: List[(Name, KnobsResource)])(implicit F: ConcurrentEffect[F]): F[Unit] = {
def fix[A](p: (String, A)) = (addDot(p._1), p._2)
for {
_ <- base.paths.modify(prev => prev ++ paths.map(fix))
_ <- base.paths.update(prev => prev ++ paths.map(fix))
_ <- base.reload
} yield ()
}
Expand All @@ -68,11 +67,11 @@ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) {
* will be lost.
*/
def addEnv(props: Env)(implicit F: Effect[F]): F[Unit] = for {
p <- base.cfgMap.modify2 { m =>
p <- base.cfgMap.modify { m =>
val mp = m ++ props.map { case (k, v) => (root + k, v) }
(mp, (m, mp))
}
(_, (m, mp)) = p
(m, mp) = p
subs <- base.subs.get
_ <- notifySubscribers(m, mp, subs)
} yield ()
Expand Down Expand Up @@ -156,20 +155,20 @@ case class MutableConfig[F[_]](root: String, base: BaseConfig[F]) {
* Subscribe to notifications. The given handler will be invoked when any
* change occurs to a configuration property that matches the pattern.
*/
def subscribe(p: Pattern, h: ChangeHandler[F])(implicit F: Apply[F]): F[Unit] =
base.subs.modify { map =>
def subscribe(p: Pattern, h: ChangeHandler[F]): F[Unit] =
base.subs.update { map =>
map.get(p.local(root)) match {
case None => map + ((p.local(root), List(h)))
case Some(existing) => map + ((p.local(root), existing ++ List(h)))
}
}.void
}

/**
* A process that produces chages to the configuration properties that match
* the given pattern
*/
def changes(p: Pattern)(implicit F: Effect[F], ec: ExecutionContext): Stream[F, (Name, Option[CfgValue])] = {
val signal = signalOf[F, (Name, Option[CfgValue])](("", None)) // TP: Not sure about the soundness of this default?
def changes(p: Pattern)(implicit F: Concurrent[F]): Stream[F, (Name, Option[CfgValue])] = {
val signal = SignallingRef[F, (Name, Option[CfgValue])](("", None)) // TP: Not sure about the soundness of this default?

Stream.eval {
for {
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/knobs/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._

import cats.Show
import cats.data.NonEmptyVector
import cats.effect.{Async, Effect, Sync}
import cats.effect.{Async, Sync, Concurrent}
import cats.implicits._
import fs2.Stream

Expand All @@ -50,7 +50,7 @@ trait Resource[R] extends Show[R] {
}

trait Watchable[R] extends Resource[R] {
def watch[F[_]](path: Worth[R])(implicit F: Effect[F]): F[(List[Directive], Stream[F, Unit])]
def watch[F[_]](path: Worth[R])(implicit F: Concurrent[F]): F[(List[Directive], Stream[F, Unit])]
}

/** An existential resource. Equivalent to the (Haskell-style) type `exists r. Resource r ⇒ r` */
Expand Down Expand Up @@ -93,7 +93,7 @@ object FileResource {
* Optionally creates a process to watch changes to the file and
* reload any `MutableConfig` if it has changed.
*/
def apply(f: File, watched: Boolean = true)(implicit ec: ExecutionContext): ResourceBox = {
def apply(f: File, watched: Boolean = true): ResourceBox = {
val _ = watched
Watched(f.getCanonicalFile)
}
Expand All @@ -102,7 +102,7 @@ object FileResource {
* Creates a new resource that loads a configuration from a file.
* Does not watch the file for changes or reload the config automatically.
*/
def unwatched(f: File)(implicit ec: ExecutionContext): ResourceBox = Resource.box(f.getCanonicalFile)
def unwatched(f: File): ResourceBox = Resource.box(f.getCanonicalFile)
}

object ClassPathResource {
Expand Down Expand Up @@ -152,7 +152,7 @@ object Resource {

private val watchService: WatchService = FileSystems.getDefault.newWatchService
private def watchStream[F[_]](implicit F: Async[F]): Stream[F, WatchEvent[_]] =
Stream.eval(F.shift(watchPool) *> F.delay(watchService.take)).flatMap(s =>
Stream.eval(Async.shift[F](watchPool) *> F.delay(watchService.take)).flatMap(s =>
Stream.emits(s.pollEvents.asScala)).repeat

/** Construct a new boxed resource from a valid `Resource` */
Expand Down Expand Up @@ -222,7 +222,7 @@ object Resource {
} yield watchStream.filter(_.context == f)
}

implicit def fileResource(implicit ec: ExecutionContext): Watchable[File] = new Watchable[File] {
implicit def fileResource: Watchable[File] = new Watchable[File] {
def resolve(r: File, child: String): File =
new File(resolveName(r.getPath, child))

Expand All @@ -231,11 +231,11 @@ object Resource {

override def show(r: File) = r.toString

def watch[F[_]](path: Worth[File])(implicit F: Effect[F]) = for {
def watch[F[_]](path: Worth[File])(implicit F: Concurrent[F]) = for {
ds <- load(path)
rs <- recursiveImports(path.worth, ds)
es <- (path.worth :: rs).traverse(f => watchEvent(f.toPath))
} yield (ds, Stream.emits(es.map(_.map(_ => ()))).joinUnbounded)
} yield (ds, Stream.emits(es.map(_.map(_ => ()))).parJoinUnbounded)
}

implicit def uriResource: Resource[URI] = new Resource[URI] {
Expand Down
35 changes: 15 additions & 20 deletions core/src/main/scala/knobs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import cats.effect._
import cats.implicits._
import fs2.Stream
import fs2.async.Ref
import scala.concurrent.ExecutionContext
import cats.effect.concurrent.Ref

package object knobs {
import Resource._
Expand Down Expand Up @@ -55,9 +54,8 @@ package object knobs {
* first time they are opened, so you can specify a file name such as
* `"$(HOME)/myapp.cfg"`.
*/
def load[F[_]: Effect](files: List[KnobsResource],
pool: ExecutionContext = Resource.notificationPool): F[MutableConfig[F]] =
loadp(files.map(f => ("", f)), pool).map(MutableConfig("", _))
def load[F[_]: ConcurrentEffect](files: List[KnobsResource]): F[MutableConfig[F]] =
loadp(files.map(f => ("", f))).map(MutableConfig("", _))

/**
* Create an immutable `Config` from the contents of the named files. Throws an
Expand All @@ -67,22 +65,20 @@ package object knobs {
* first time they are opened, so you can specify a file name such as
* `"$(HOME)/myapp.cfg"`.
*/
def loadImmutable[F[_]: Effect](files: List[KnobsResource],
pool: ExecutionContext = Resource.notificationPool): F[Config] =
def loadImmutable[F[_]: ConcurrentEffect](files: List[KnobsResource]): F[Config] =
load(files.map(_.map {
case WatchBox(b, w) => ResourceBox(b)(w)
case x => x
}), pool).flatMap(_.immutable)
})).flatMap(_.immutable)

/**
* Create a `MutableConfig` from the contents of the named files, placing them
* into named prefixes.
*/
def loadGroups[F[_]: Effect](files: List[(Name, KnobsResource)],
pool: ExecutionContext = Resource.notificationPool): F[MutableConfig[F]] =
loadp(files, pool).map(MutableConfig("", _))
def loadGroups[F[_]: ConcurrentEffect](files: List[(Name, KnobsResource)]): F[MutableConfig[F]] =
loadp(files).map(MutableConfig("", _))

private [knobs] def loadFiles[F[_]: Effect](paths: List[KnobsResource]): F[Loaded[F]] = {
private [knobs] def loadFiles[F[_]: ConcurrentEffect](paths: List[KnobsResource]): F[Loaded[F]] = {
def go(seen: Loaded[F], path: KnobsResource): F[Loaded[F]] = {
def notSeen(n: KnobsResource): Boolean = seen.get(n).isEmpty
for {
Expand All @@ -98,16 +94,15 @@ package object knobs {
}

private [knobs] def loadp[F[_]](
paths: List[(Name, KnobsResource)],
pool: ExecutionContext = Resource.notificationPool)(implicit F: Effect[F]): F[BaseConfig[F]] = {
implicit val implicitPool = pool
paths: List[(Name, KnobsResource)]
)(implicit F: ConcurrentEffect[F]): F[BaseConfig[F]] = {
for {
loaded <- loadFiles(paths.map(_._2))
p <- Ref(paths)
m <- flatten(paths, loaded).flatMap(Ref(_))
s <- Ref(Map[Pattern, List[ChangeHandler[F]]]())
p <- Ref[F].of(paths)
m <- flatten(paths, loaded).flatMap(Ref[F].of(_))
s <- Ref[F].of(Map[Pattern, List[ChangeHandler[F]]]())
bc = BaseConfig(paths = p, cfgMap = m, subs = s)
ticks = Stream.emits(loaded.values.map(_._2).toSeq).joinUnbounded
ticks = Stream.emits(loaded.values.map(_._2).toSeq).parJoinUnbounded
_ <- F.delay(F.runAsync(ticks.evalMap(_ => bc.reload).compile.drain)(_ => IO.unit).unsafeRunSync)
} yield bc
}
Expand Down Expand Up @@ -222,7 +217,7 @@ package object knobs {
implicitly[Resource[R]].load(Required(r)).flatMap(dds =>
recursiveImports(r, dds))))

private [knobs] def loadOne[F[_]: Effect](path: KnobsResource): F[(List[Directive], Stream[F, Unit])] = {
private [knobs] def loadOne[F[_]: ConcurrentEffect](path: KnobsResource): F[(List[Directive], Stream[F, Unit])] = {
val box = path.worth
val r: box.R = box.resource
box.watchable match {
Expand Down
16 changes: 10 additions & 6 deletions core/src/test/scala/knobs/FileWatcherTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,32 @@
//: ----------------------------------------------------------------------------
package knobs

import fs2.async.Ref
import java.nio.file.{ Files, Paths }

import cats.effect.concurrent._
import java.nio.file.{Files, Paths}
import java.util.concurrent.CountDownLatch

import org.scalacheck._
import org.scalacheck.Prop._
import scala.concurrent.ExecutionContext.Implicits.global

import cats.effect.IO

import scala.concurrent.ExecutionContext

object FileWatcherTests extends Properties("FileWatch") {

implicit val cs = IO.contextShift(ExecutionContext.global)

property("file watch") = {
val mutantUri = Thread.currentThread.getContextClassLoader.getResource("mutant.cfg").toURI
val mutantPath = Paths.get(mutantUri)
val latch = new CountDownLatch(1)
val prg = for {
ref <- Ref[IO, String]("")
ref <- Ref.of[IO, String]("")
_ <- IO(Files.write(mutantPath, "foo = \"bletch\"\n".getBytes))
cfg <- load[IO](List(Required(FileResource(mutantPath.toFile))))
_ <- cfg.subscribe(Exact("foo"), {
case ("foo", Some(t: CfgText)) =>
ref.setSync(t.pretty).flatMap(_ => IO(latch.countDown))
ref.set(t.pretty).flatMap(_ => IO(latch.countDown))
case _ => {
IO(latch.countDown)
}
Expand Down
7 changes: 6 additions & 1 deletion core/src/test/scala/knobs/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@ package knobs

import org.scalacheck._
import Prop._
import scala.concurrent.duration._

import scala.concurrent.duration._
import cats.effect.IO
import cats.implicits._

import scala.concurrent.ExecutionContext


object Test extends Properties("Knobs") {

// TODO remove when available in Scalacheck
// https://github.com/rickynils/scalacheck/pull/284
private implicit val arbFiniteDuration: Arbitrary[FiniteDuration] = Arbitrary(
Gen.chooseNum(Long.MinValue + 1, Long.MaxValue).map(Duration.fromNanos))

implicit val cs = IO.contextShift(ExecutionContext.global)

def withLoad[A](files: List[KnobsResource])(
t: MutableConfig[IO] => IO[A]): IO[A] = for {
mb <- load[IO](files)
Expand Down
2 changes: 1 addition & 1 deletion typesafe/build.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// 1.3.0 is out but it depends on java 1.8.
libraryDependencies += "com.typesafe" % "config" % "1.2.1"
libraryDependencies += "com.typesafe" % "config" % "1.3.3"
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "6.0.0-SNAPSHOT"
version in ThisBuild := "7.0.0-SNAPSHOT"
4 changes: 2 additions & 2 deletions zookeeper/build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
libraryDependencies ++= Seq(
"org.apache.curator" % "curator-framework" % "2.8.0",
"org.apache.curator" % "curator-test" % "2.8.0" % "test"
"org.apache.curator" % "curator-framework" % "4.0.1",
"org.apache.curator" % "curator-test" % "4.0.1" % "test"
)
Loading

0 comments on commit 93856e4

Please sign in to comment.