From b280cd2b2479db2b808e4ef5b4c5d111ff53de44 Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 2 Apr 2024 20:25:11 +0100 Subject: [PATCH 1/3] feat(storage): Create Hazelcast storage driver --- build.sbt | 36 ++++++--- .../shardcake/HazelcastConfig.scala | 12 +++ .../shardcake/StorageHazelcast.scala | 73 +++++++++++++++++++ .../shardcake/StorageHazelcastSpec.scala | 69 ++++++++++++++++++ 4 files changed, 179 insertions(+), 11 deletions(-) create mode 100644 storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala create mode 100644 storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala create mode 100644 storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala diff --git a/build.sbt b/build.sbt index 96bac9d7..a48d458f 100644 --- a/build.sbt +++ b/build.sbt @@ -2,17 +2,18 @@ val scala213 = "2.13.11" val scala3 = "3.3.0" val allScala = Seq(scala213, scala3) -val zioVersion = "2.0.21" -val zioGrpcVersion = "0.6.0" -val zioK8sVersion = "2.1.1" -val zioCacheVersion = "0.2.3" -val zioCatsInteropVersion = "23.1.0.0" -val sttpVersion = "3.9.3" -val calibanVersion = "2.5.3" -val redis4catsVersion = "1.5.2" -val redissonVersion = "3.27.1" -val scalaKryoVersion = "1.0.2" -val testContainersVersion = "0.41.3" +val zioVersion = "2.0.21" +val zioGrpcVersion = "0.6.0" +val zioK8sVersion = "2.1.1" +val zioCacheVersion = "0.2.3" +val zioCatsInteropVersion = "23.1.0.0" +val sttpVersion = "3.9.3" +val calibanVersion = "2.5.3" +val redis4catsVersion = "1.5.2" +val redissonVersion = "3.27.1" +val hazelcastClientVersion = "5.0.1" +val scalaKryoVersion = "1.0.2" +val testContainersVersion = "0.41.3" inThisBuild( List( @@ -141,6 +142,19 @@ lazy val storageRedisson = project ) ) +lazy val storageHazelcase = project + .in(file("storage-hazelcast")) + .settings(name := "shardcake-storage-hazelcast") + .settings(commonSettings) + .dependsOn(core) + .settings( + libraryDependencies ++= + Seq( + "com.hazelcast" % "hazelcast" % hazelcastClientVersion, + "ch.qos.logback" % "logback-classic" % "1.4.7" + ) + ) + lazy val serializationKryo = project .in(file("serialization-kryo")) .settings(name := "shardcake-serialization-kryo") diff --git a/storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala new file mode 100644 index 00000000..ee646f42 --- /dev/null +++ b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala @@ -0,0 +1,12 @@ +package com.devsisters.shardcake + +/** + * The configuration for the Hazelcast storage implementation. + * @param assignmentsKey the key to store shard assignments + * @param podsKey the key to store registered pods + */ +case class HazelcastConfig(assignmentsKey: String, podsKey: String) + +object HazelcastConfig { + val default: HazelcastConfig = HazelcastConfig(assignmentsKey = "shard_assignments", podsKey = "pods") +} diff --git a/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala new file mode 100644 index 00000000..4058e44e --- /dev/null +++ b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala @@ -0,0 +1,73 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.Storage +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.topic.{ Message, MessageListener } +import zio.stream.ZStream +import zio.{ Queue, Task, Unsafe, ZIO, ZLayer } + +import scala.jdk.CollectionConverters._ + +object StorageHazelcast { + + /** + * A layer that returns a Storage implementation using Redis + */ + val live: ZLayer[HazelcastInstance with HazelcastConfig, Nothing, Storage] = + ZLayer { + for { + config <- ZIO.service[HazelcastConfig] + hazelcastInstance <- ZIO.service[HazelcastInstance] + assignmentsMap = hazelcastInstance.getMap[String, String](config.assignmentsKey) + podsMap = hazelcastInstance.getMap[String, String](config.podsKey) + assignmentsTopic = hazelcastInstance.getTopic[String](config.assignmentsKey) + } yield new Storage { + def getAssignments: Task[Map[ShardId, Option[PodAddress]]] = + ZIO + .attemptBlocking(assignmentsMap.entrySet()) + .map( + _.asScala.toList + .flatMap(entry => + entry.getKey.toIntOption.map( + _ -> (if (entry.getValue.isEmpty) None + else PodAddress(entry.getValue)) + ) + ) + .toMap + ) + + def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] = + ZIO.attemptBlocking(assignmentsMap.putAll(assignments.map { case (k, v) => + k.toString -> v.fold("")(_.toString) + }.asJava)) *> + ZIO.attemptBlocking(assignmentsTopic.publish("ping")).unit + + def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] = + ZStream.unwrap { + for { + queue <- Queue.unbounded[String] + runtime <- ZIO.runtime[Any] + _ <- ZIO.attemptBlocking( + assignmentsTopic.addMessageListener( + new MessageListener[String] { + def onMessage(msg: Message[String]): Unit = + Unsafe.unsafe(implicit unsafe => runtime.unsafe.run(queue.offer(msg.getMessageObject))) + } + ) + ) + } yield ZStream.fromQueueWithShutdown(queue).mapZIO(_ => getAssignments) + } + + def getPods: Task[Map[PodAddress, Pod]] = + ZIO + .attemptBlocking(podsMap.entrySet()) + .map( + _.asScala + .flatMap(entry => PodAddress(entry.getKey).map(address => address -> Pod(address, entry.getValue))) + .toMap + ) + def savePods(pods: Map[PodAddress, Pod]): Task[Unit] = + ZIO.attemptBlocking(podsMap.putAll(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit + } + } +} diff --git a/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala b/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala new file mode 100644 index 00000000..60b732de --- /dev/null +++ b/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala @@ -0,0 +1,69 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.Storage +import com.dimafeng.testcontainers.GenericContainer +import com.hazelcast.client.HazelcastClient +import com.hazelcast.client.config.{ ClientConfig, ClientNetworkConfig } +import com.hazelcast.core.HazelcastInstance +import zio.Clock.ClockLive +import zio._ +import zio.stream.ZStream +import zio.test.TestAspect.sequential +import zio.test._ + +object StorageHazelcastSpec extends ZIOSpecDefault { + val container: ZLayer[Any, Nothing, GenericContainer] = + ZLayer.scoped { + ZIO.acquireRelease { + ZIO.attemptBlocking { + val container = new GenericContainer(dockerImage = "hazelcast/hazelcast:5.1.2", exposedPorts = Seq(5701)) + container.start() + container + }.orDie + }(container => ZIO.attemptBlocking(container.stop()).orDie) + } + + val hazelcast: ZLayer[GenericContainer, Throwable, HazelcastInstance] = + ZLayer { + for { + container <- ZIO.service[GenericContainer] + uri = s"${container.host}:${container.mappedPort(container.exposedPorts.head)}" + hazelConfig = new ClientConfig + hazelNetConfig = new ClientNetworkConfig + _ = hazelNetConfig.addAddress(uri) + _ = hazelConfig.setNetworkConfig(hazelNetConfig) + } yield HazelcastClient.newHazelcastClient(hazelConfig) + } + + def spec: Spec[TestEnvironment with Scope, Any] = + suite("StorageRedisSpec")( + test("save and get pods") { + val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0")) + .map(p => p.address -> p) + .toMap + for { + _ <- ZIO.serviceWithZIO[Storage](_.savePods(expected)) + actual <- ZIO.serviceWithZIO[Storage](_.getPods) + } yield assertTrue(expected == actual) + }, + test("save and get assignments") { + val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None) + for { + _ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected)) + actual <- ZIO.serviceWithZIO[Storage](_.getAssignments) + } yield assertTrue(expected == actual) + }, + test("assignments stream") { + val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None) + for { + p <- Promise.make[Nothing, Map[Int, Option[PodAddress]]] + _ <- ZStream.serviceWithStream[Storage](_.assignmentsStream).runForeach(p.succeed(_)).fork + _ <- ClockLive.sleep(1 second) + _ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected)) + actual <- p.await + } yield assertTrue(expected == actual) + } + ).provideLayerShared( + container >>> hazelcast ++ ZLayer.succeed(HazelcastConfig.default) >>> StorageHazelcast.live + ) @@ sequential +} From 4c26df9a71bc2244d51fa57442b2628c81767d9f Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 2 Apr 2024 21:17:47 +0100 Subject: [PATCH 2/3] fix(tests): Change name --- .../scala/com/devsisters/shardcake/StorageHazelcastSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala b/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala index 60b732de..67aa08f2 100644 --- a/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala +++ b/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala @@ -36,7 +36,7 @@ object StorageHazelcastSpec extends ZIOSpecDefault { } def spec: Spec[TestEnvironment with Scope, Any] = - suite("StorageRedisSpec")( + suite("StorageHazelcastSpec")( test("save and get pods") { val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0")) .map(p => p.address -> p) From e4088c471bda34ff8be5a0ebf6a2448dfc020f0d Mon Sep 17 00:00:00 2001 From: Lewis Date: Thu, 17 Oct 2024 00:26:38 -0400 Subject: [PATCH 3/3] feat(hazelcast): make requested changes --- build.sbt | 3 ++- .../main/scala/com/devsisters/shardcake/ShardManager.scala | 2 +- .../scala/com/devsisters/shardcake/StorageHazelcast.scala | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index a48d458f..b920bfa4 100644 --- a/build.sbt +++ b/build.sbt @@ -56,6 +56,7 @@ lazy val root = project healthK8s, storageRedis, storageRedisson, + storageHazelcast, serializationKryo, grpcProtocol, examples @@ -142,7 +143,7 @@ lazy val storageRedisson = project ) ) -lazy val storageHazelcase = project +lazy val storageHazelcast = project .in(file("storage-hazelcast")) .settings(name := "shardcake-storage-hazelcast") .settings(commonSettings) diff --git a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala index 9727754b..9408ea9f 100644 --- a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala +++ b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala @@ -1,4 +1,4 @@ -package com.devsisters.shardcake + package com.devsisters.shardcake import com.devsisters.shardcake.ShardManager._ import com.devsisters.shardcake.interfaces._ diff --git a/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala index 4058e44e..dd2b3094 100644 --- a/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala +++ b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala @@ -11,7 +11,7 @@ import scala.jdk.CollectionConverters._ object StorageHazelcast { /** - * A layer that returns a Storage implementation using Redis + * A layer that returns a Storage implementation using Hazelcast */ val live: ZLayer[HazelcastInstance with HazelcastConfig, Nothing, Storage] = ZLayer { @@ -67,7 +67,7 @@ object StorageHazelcast { .toMap ) def savePods(pods: Map[PodAddress, Pod]): Task[Unit] = - ZIO.attemptBlocking(podsMap.putAll(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit + ZIO.fromCompletionStage(podsMap.putAllAsync(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit } } }