Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploying to kafka: initial changes, a separate service #45

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.mwam.kafkakewl.common.http.HttpServer
import com.mwam.kafkakewl.common.persistence.{KafkaPersistentStore, PersistentStore}
import com.mwam.kafkakewl.common.telemetry.GlobalTracer
import com.mwam.kafkakewl.deploy.endpoints.*
import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsService
import com.mwam.kafkakewl.deploy.services.{TopologyDeploymentsService, TopologyDeploymentsToKafkaService}
import com.mwam.kafkakewl.domain.config.KafkaClientConfig
import sttp.tapir.server.metrics.zio.ZioMetrics
import sttp.tapir.server.ziohttp.{ZioHttpInterpreter, ZioHttpServerOptions}
Expand Down Expand Up @@ -71,6 +71,7 @@ object Main extends ZIOAppDefault {
MetricsConfig.live,
KafkaPersistentStore.live,
TopologyDeploymentsService.live,
TopologyDeploymentsToKafkaService.live,
DeploymentsEndpoints.live,
DeploymentsServerEndpoints.live,
Endpoints.live,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import zio.*

class TopologyDeploymentsService private (
private val persistentStore: PersistentStore,
private val topologyDeploymentsToKafkaService: TopologyDeploymentsToKafkaService,
private val mutex: Semaphore,
private val topologyDeploymentsRef: Ref[TopologyDeployments]
) {
def deploy(deployments: Deployments): IO[PostDeploymentsFailure, DeploymentsSuccess] =
mutex.withPermit {
for {
_ <- ZIO.logInfo(s"deploying $deployments")

// TODO authorization

// Validation before deployment
Expand All @@ -29,20 +31,29 @@ class TopologyDeploymentsService private (
.toZIOParallelErrors
.mapError(DeploymentsFailure.validation)

// TODO performing the kafka deployment itself, returns the new TopologyDeployments
// TODO persisting the new TopologyDeployments
// TODO publishing the change-log messages
// TODO update the in-memory state
// Performing the actual deployment to kafka (dryRun is respected here)
success <- topologyDeploymentsToKafkaService
.deploy(deployments)
.logError("deploying TopologyDeployments")
.mapError(DeploymentsFailure.deployment)

// Just same fake topology deployments for now
// Creating the new TopologyDeployments from the statuses
topologyDeployments = deployments.deploy
.map(t => (t.id, TopologyDeployment(t.id, TopologyDeploymentStatus(), Some(t))))
.map(t => (t.id, TopologyDeployment(t.id, success.statuses(t.id), Some(t))))
.toMap ++ deployments.delete
.map(tid => (tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None)))
.map(tid => (tid, TopologyDeployment(tid, success.statuses(tid), None)))
.toMap

_ <- persistentStore.save(topologyDeployments).logError("saving TopologyDeployments").mapError(DeploymentsFailure.persistence)
_ <- topologyDeploymentsRef.update { _ ++ topologyDeployments -- deployments.delete }
_ <- (for {
// Persisting the new TopologyDeployments
_ <- persistentStore.save(topologyDeployments).logError("saving TopologyDeployments").mapError(DeploymentsFailure.persistence)

// Updating the in-memory state if everything succeeded so far
_ <- topologyDeploymentsRef.update {
_ ++ topologyDeployments -- deployments.delete
}
} yield ()).unless(deployments.options.dryRun) // Not making any actual changes if dryRun = true

_ <- ZIO.logInfo(s"finished deploying $deployments")
} yield DeploymentsSuccess(
topologyDeployments
Expand All @@ -62,14 +73,15 @@ class TopologyDeploymentsService private (
}

object TopologyDeploymentsService {
def live: ZLayer[PersistentStore, Nothing, TopologyDeploymentsService] =
def live: ZLayer[PersistentStore & TopologyDeploymentsToKafkaService, Nothing, TopologyDeploymentsService] =
ZLayer.fromZIO {
for {
persistentStore <- ZIO.service[PersistentStore]
topologyDeploymentsToKafkaService <- ZIO.service[TopologyDeploymentsToKafkaService]
// TODO what happens if we fail here?
topologyDeployments <- persistentStore.loadLatest().orDie
mutex <- Semaphore.make(permits = 1)
topologyDeploymentsRef <- Ref.make(topologyDeployments)
} yield TopologyDeploymentsService(persistentStore, mutex, topologyDeploymentsRef)
} yield TopologyDeploymentsService(persistentStore, topologyDeploymentsToKafkaService, mutex, topologyDeploymentsRef)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-FileCopyrightText: 2023 Marshall Wace <[email protected]>
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.mwam.kafkakewl.deploy.services

import com.mwam.kafkakewl.domain.{Deployments, DeploymentsSuccess, TopologyDeploymentStatus}
import zio.*

class TopologyDeploymentsToKafkaService {
def deploy(deployments: Deployments): Task[DeploymentsSuccess] = {
ZIO.succeed {
// For now we just return empty statuses for all deploy/delete topologies
DeploymentsSuccess(
deployments.deploy.map(topology => (topology.id, TopologyDeploymentStatus())).toMap ++
deployments.delete.map(tid => (tid, TopologyDeploymentStatus())).toMap
)
}
}
}

object TopologyDeploymentsToKafkaService {
def live: ZLayer[Any, Nothing, TopologyDeploymentsToKafkaService] =
// TODO proper dependencies, etc...
ZLayer.succeed {
TopologyDeploymentsToKafkaService()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package com.mwam.kafkakewl.domain
import zio.NonEmptyChunk

final case class DeploymentOptions(
dryRun: Boolean = true,
// TODO make allowing unsafe operations more granular if needed
allowUnsafe: Boolean = false
)
Expand Down