diff --git a/build.sbt b/build.sbt index 96bac9d..b920bfa 100644 --- a/build.sbt +++ b/build.sbt @@ -2,17 +2,18 @@ val scala213 = "2.13.11" val scala3 = "3.3.0" val allScala = Seq(scala213, scala3) -val zioVersion = "2.0.21" -val zioGrpcVersion = "0.6.0" -val zioK8sVersion = "2.1.1" -val zioCacheVersion = "0.2.3" -val zioCatsInteropVersion = "23.1.0.0" -val sttpVersion = "3.9.3" -val calibanVersion = "2.5.3" -val redis4catsVersion = "1.5.2" -val redissonVersion = "3.27.1" -val scalaKryoVersion = "1.0.2" -val testContainersVersion = "0.41.3" +val zioVersion = "2.0.21" +val zioGrpcVersion = "0.6.0" +val zioK8sVersion = "2.1.1" +val zioCacheVersion = "0.2.3" +val zioCatsInteropVersion = "23.1.0.0" +val sttpVersion = "3.9.3" +val calibanVersion = "2.5.3" +val redis4catsVersion = "1.5.2" +val redissonVersion = "3.27.1" +val hazelcastClientVersion = "5.0.1" +val scalaKryoVersion = "1.0.2" +val testContainersVersion = "0.41.3" inThisBuild( List( @@ -55,6 +56,7 @@ lazy val root = project healthK8s, storageRedis, storageRedisson, + storageHazelcast, serializationKryo, grpcProtocol, examples @@ -141,6 +143,19 @@ lazy val storageRedisson = project ) ) +lazy val storageHazelcast = project + .in(file("storage-hazelcast")) + .settings(name := "shardcake-storage-hazelcast") + .settings(commonSettings) + .dependsOn(core) + .settings( + libraryDependencies ++= + Seq( + "com.hazelcast" % "hazelcast" % hazelcastClientVersion, + "ch.qos.logback" % "logback-classic" % "1.4.7" + ) + ) + lazy val serializationKryo = project .in(file("serialization-kryo")) .settings(name := "shardcake-serialization-kryo") diff --git a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala index 9727754..9408ea9 100644 --- a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala +++ b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala @@ -1,4 +1,4 @@ -package com.devsisters.shardcake + package com.devsisters.shardcake import com.devsisters.shardcake.ShardManager._ import com.devsisters.shardcake.interfaces._ diff --git a/storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala new file mode 100644 index 0000000..ee646f4 --- /dev/null +++ b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/HazelcastConfig.scala @@ -0,0 +1,12 @@ +package com.devsisters.shardcake + +/** + * The configuration for the Hazelcast storage implementation. + * @param assignmentsKey the key to store shard assignments + * @param podsKey the key to store registered pods + */ +case class HazelcastConfig(assignmentsKey: String, podsKey: String) + +object HazelcastConfig { + val default: HazelcastConfig = HazelcastConfig(assignmentsKey = "shard_assignments", podsKey = "pods") +} diff --git a/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala new file mode 100644 index 0000000..dd2b309 --- /dev/null +++ b/storage-hazelcast/src/main/scala/com/devsisters/shardcake/StorageHazelcast.scala @@ -0,0 +1,73 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.Storage +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.topic.{ Message, MessageListener } +import zio.stream.ZStream +import zio.{ Queue, Task, Unsafe, ZIO, ZLayer } + +import scala.jdk.CollectionConverters._ + +object StorageHazelcast { + + /** + * A layer that returns a Storage implementation using Hazelcast + */ + val live: ZLayer[HazelcastInstance with HazelcastConfig, Nothing, Storage] = + ZLayer { + for { + config <- ZIO.service[HazelcastConfig] + hazelcastInstance <- ZIO.service[HazelcastInstance] + assignmentsMap = hazelcastInstance.getMap[String, String](config.assignmentsKey) + podsMap = hazelcastInstance.getMap[String, String](config.podsKey) + assignmentsTopic = hazelcastInstance.getTopic[String](config.assignmentsKey) + } yield new Storage { + def getAssignments: Task[Map[ShardId, Option[PodAddress]]] = + ZIO + .attemptBlocking(assignmentsMap.entrySet()) + .map( + _.asScala.toList + .flatMap(entry => + entry.getKey.toIntOption.map( + _ -> (if (entry.getValue.isEmpty) None + else PodAddress(entry.getValue)) + ) + ) + .toMap + ) + + def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] = + ZIO.attemptBlocking(assignmentsMap.putAll(assignments.map { case (k, v) => + k.toString -> v.fold("")(_.toString) + }.asJava)) *> + ZIO.attemptBlocking(assignmentsTopic.publish("ping")).unit + + def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] = + ZStream.unwrap { + for { + queue <- Queue.unbounded[String] + runtime <- ZIO.runtime[Any] + _ <- ZIO.attemptBlocking( + assignmentsTopic.addMessageListener( + new MessageListener[String] { + def onMessage(msg: Message[String]): Unit = + Unsafe.unsafe(implicit unsafe => runtime.unsafe.run(queue.offer(msg.getMessageObject))) + } + ) + ) + } yield ZStream.fromQueueWithShutdown(queue).mapZIO(_ => getAssignments) + } + + def getPods: Task[Map[PodAddress, Pod]] = + ZIO + .attemptBlocking(podsMap.entrySet()) + .map( + _.asScala + .flatMap(entry => PodAddress(entry.getKey).map(address => address -> Pod(address, entry.getValue))) + .toMap + ) + def savePods(pods: Map[PodAddress, Pod]): Task[Unit] = + ZIO.fromCompletionStage(podsMap.putAllAsync(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit + } + } +} diff --git a/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala b/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala new file mode 100644 index 0000000..67aa08f --- /dev/null +++ b/storage-hazelcast/src/test/scala/com/devsisters/shardcake/StorageHazelcastSpec.scala @@ -0,0 +1,69 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.Storage +import com.dimafeng.testcontainers.GenericContainer +import com.hazelcast.client.HazelcastClient +import com.hazelcast.client.config.{ ClientConfig, ClientNetworkConfig } +import com.hazelcast.core.HazelcastInstance +import zio.Clock.ClockLive +import zio._ +import zio.stream.ZStream +import zio.test.TestAspect.sequential +import zio.test._ + +object StorageHazelcastSpec extends ZIOSpecDefault { + val container: ZLayer[Any, Nothing, GenericContainer] = + ZLayer.scoped { + ZIO.acquireRelease { + ZIO.attemptBlocking { + val container = new GenericContainer(dockerImage = "hazelcast/hazelcast:5.1.2", exposedPorts = Seq(5701)) + container.start() + container + }.orDie + }(container => ZIO.attemptBlocking(container.stop()).orDie) + } + + val hazelcast: ZLayer[GenericContainer, Throwable, HazelcastInstance] = + ZLayer { + for { + container <- ZIO.service[GenericContainer] + uri = s"${container.host}:${container.mappedPort(container.exposedPorts.head)}" + hazelConfig = new ClientConfig + hazelNetConfig = new ClientNetworkConfig + _ = hazelNetConfig.addAddress(uri) + _ = hazelConfig.setNetworkConfig(hazelNetConfig) + } yield HazelcastClient.newHazelcastClient(hazelConfig) + } + + def spec: Spec[TestEnvironment with Scope, Any] = + suite("StorageHazelcastSpec")( + test("save and get pods") { + val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0")) + .map(p => p.address -> p) + .toMap + for { + _ <- ZIO.serviceWithZIO[Storage](_.savePods(expected)) + actual <- ZIO.serviceWithZIO[Storage](_.getPods) + } yield assertTrue(expected == actual) + }, + test("save and get assignments") { + val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None) + for { + _ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected)) + actual <- ZIO.serviceWithZIO[Storage](_.getAssignments) + } yield assertTrue(expected == actual) + }, + test("assignments stream") { + val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None) + for { + p <- Promise.make[Nothing, Map[Int, Option[PodAddress]]] + _ <- ZStream.serviceWithStream[Storage](_.assignmentsStream).runForeach(p.succeed(_)).fork + _ <- ClockLive.sleep(1 second) + _ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected)) + actual <- p.await + } yield assertTrue(expected == actual) + } + ).provideLayerShared( + container >>> hazelcast ++ ZLayer.succeed(HazelcastConfig.default) >>> StorageHazelcast.live + ) @@ sequential +}