From 45f49e878868a89988a9051ea055f05a459abc66 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Thu, 16 Dec 2021 21:51:11 -0800 Subject: [PATCH 1/2] Proposed Second Pipeline with Effects and Mutable State --- .../rediculous/RedisConnection.scala | 1 + .../rediculous/RedisPipeline2.scala | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala index 84a42d3..7b41aaa 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisConnection.scala @@ -63,6 +63,7 @@ object RedisConnection{ inputs: NonEmptyList[NonEmptyList[String]], key: Option[String] ): F[NonEmptyList[Resp]] = { + val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest)) def withSocket(socket: Socket[F]): F[NonEmptyList[Resp]] = explicitPipelineRequest[F](socket, chunk).flatMap(l => l.toNel.toRight(RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input")).liftTo[F]) def raiseNonEmpty(chunk: Chunk[Resp]): F[NonEmptyList[Resp]] = diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala new file mode 100644 index 0000000..dd03276 --- /dev/null +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala @@ -0,0 +1,76 @@ +package io.chrisdavenport.rediculous + +import cats._ +import cats.implicits._ +import cats.data.{NonEmptyList, Chain, Nested, Kleisli} +import cats.effect._ + +case class RedisPipeline2[F[_], A](value: F[Ref[F, (Chain[NonEmptyList[String]], Option[String])] => F[RedisTransaction.Queued[A]]]) + +object RedisPipeline2 { + + implicit def ctx[F[_]: Monad]: RedisCtx[RedisPipeline2[F, *]] = new RedisCtx[RedisPipeline2[F, *]]{ + def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): RedisPipeline2[F, A] = RedisPipeline2[F, A](Applicative[F].pure{ + (ref: Ref[F, (Chain[NonEmptyList[String]], Option[String])]) => + ref.modify{ + case (base, value) => + val newCommands = base.append(command) + (newCommands, value.orElse(Some(key))) -> newCommands.size + }.map(i => + RedisTransaction.Queued(l => RedisResult[A].decode(l(i.toInt))) + ) + }) + + def unkeyed[A: RedisResult](command: NonEmptyList[String]): RedisPipeline2[F, A] = RedisPipeline2[F, A](Applicative[F].pure{ + (ref: Ref[F, (Chain[NonEmptyList[String]], Option[String])]) => + ref.modify{ + case (base, value) => + val newCommands = base.append(command) + (newCommands, value) -> newCommands.size + }.map(i => + RedisTransaction.Queued(l => RedisResult[A].decode(l(i.toInt))) + ) + }) + } + + implicit def applicative[F[_]: Applicative]: Applicative[RedisPipeline2[F, *]] = new Applicative[RedisPipeline2[F, *]]{ + def pure[A](a: A) = RedisPipeline2(Applicative[F].pure(_ => Applicative[F].pure(Monad[RedisTransaction.Queued].pure(a)))) + + override def ap[A, B](ff: RedisPipeline2[F, A => B])(fa: RedisPipeline2[F, A]): RedisPipeline2[F, B] = + RedisPipeline2( + ( + ff.value, + fa.value + ).mapN{ + case (qFF, qFA) => + {case (ref) => + (qFF(ref), qFA(ref)).mapN{ + case (qff, qfa) => qff.ap(qfa) + } + } + } + ) + } + + def toRedis[F[_]: Concurrent, A](pipeline: RedisPipeline2[F, A]): Redis[F, A] = Redis(Kleisli{connection => + Concurrent[F].ref((Chain.empty[NonEmptyList[String]], Option.empty[String])).flatMap( + ref => + pipeline.value.flatMap{f => + f(ref).flatMap{ queued => + ref.get.flatMap{ + case ((chain, key)) => + val commands = chain.toList.toNel + commands.traverse(nelCommands => + RedisConnection.runRequestInternal(connection)(nelCommands, key) // We Have to Actually Send A Command + .flatMap{nel => RedisConnection.closeReturn[F, A](queued.f(nel.toList))} + ).flatMap{ + case Some(a) => a.pure[F] + case None => Concurrent[F].raiseError(RedisError.Generic("Rediculous: Attempted to Pipeline Empty Command")) + } + } + } + } + ) + }) + +} \ No newline at end of file From 8f079ed3c33b9df106962f1d5125308f5cf1fc10 Mon Sep 17 00:00:00 2001 From: Christopher Davenport Date: Thu, 16 Dec 2021 22:11:40 -0800 Subject: [PATCH 2/2] Fix it up --- .../rediculous/RedisPipeline2.scala | 18 +++++++++++++----- examples/src/main/scala/PipelineExample.scala | 17 +++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala b/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala index dd03276..c79d796 100644 --- a/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala +++ b/core/src/main/scala/io/chrisdavenport/rediculous/RedisPipeline2.scala @@ -5,7 +5,9 @@ import cats.implicits._ import cats.data.{NonEmptyList, Chain, Nested, Kleisli} import cats.effect._ -case class RedisPipeline2[F[_], A](value: F[Ref[F, (Chain[NonEmptyList[String]], Option[String])] => F[RedisTransaction.Queued[A]]]) +case class RedisPipeline2[F[_], A](value: F[Ref[F, (Chain[NonEmptyList[String]], Option[String])] => F[RedisTransaction.Queued[A]]]){ + def pipeline(implicit ev: Concurrent[F]): Redis[F, A] = RedisPipeline2.toRedis[F, A](this) +} object RedisPipeline2 { @@ -15,9 +17,12 @@ object RedisPipeline2 { ref.modify{ case (base, value) => val newCommands = base.append(command) - (newCommands, value.orElse(Some(key))) -> newCommands.size + (newCommands, value.orElse(Some(key))) -> base.size }.map(i => - RedisTransaction.Queued(l => RedisResult[A].decode(l(i.toInt))) + RedisTransaction.Queued{l => + val out = RedisResult[A].decode(l(i.toInt)) + out + } ) }) @@ -26,7 +31,7 @@ object RedisPipeline2 { ref.modify{ case (base, value) => val newCommands = base.append(command) - (newCommands, value) -> newCommands.size + (newCommands, value) -> base.size }.map(i => RedisTransaction.Queued(l => RedisResult[A].decode(l(i.toInt))) ) @@ -62,7 +67,10 @@ object RedisPipeline2 { val commands = chain.toList.toNel commands.traverse(nelCommands => RedisConnection.runRequestInternal(connection)(nelCommands, key) // We Have to Actually Send A Command - .flatMap{nel => RedisConnection.closeReturn[F, A](queued.f(nel.toList))} + .flatMap{nel => + println(s"Got back nel: $nel") + RedisConnection.closeReturn[F, A](queued.f(nel.toList)) + } ).flatMap{ case Some(a) => a.pure[F] case None => Concurrent[F].raiseError(RedisError.Generic("Rediculous: Attempted to Pipeline Empty Command")) diff --git a/examples/src/main/scala/PipelineExample.scala b/examples/src/main/scala/PipelineExample.scala index e66b3b3..1e40cfc 100644 --- a/examples/src/main/scala/PipelineExample.scala +++ b/examples/src/main/scala/PipelineExample.scala @@ -7,6 +7,7 @@ import com.comcast.ip4s._ // Send a Single Set of Pipelined Commands to the Redis Server object PipelineExample extends IOApp { + type RedisPipelineIO[A] = RedisPipeline2[IO, A] def run(args: List[String]): IO[ExitCode] = { val r = for { // maxQueued: How many elements before new submissions semantically block. Tradeoff of memory to queue jobs. @@ -16,15 +17,15 @@ object PipelineExample extends IOApp { } yield connection r.use {client => - val r = ( - RedisCommands.ping[RedisPipeline], - RedisCommands.del[RedisPipeline]("foo"), - RedisCommands.get[RedisPipeline]("foo"), - RedisCommands.set[RedisPipeline]("foo", "value"), - RedisCommands.get[RedisPipeline]("foo") - ).tupled + val r = List.fill(10000)(( + RedisCommands.ping[RedisPipelineIO], + RedisCommands.del[RedisPipelineIO]("foo"), + RedisCommands.get[RedisPipelineIO]("foo"), + RedisCommands.set[RedisPipelineIO]("foo", "value"), + RedisCommands.get[RedisPipelineIO]("foo") + ).tupled).sequence - val multi = r.pipeline[IO] + val multi = r.pipeline multi.run(client).flatTap(output => IO(println(output)))