Skip to content

Commit

Permalink
Changed column width
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Hall <[email protected]>
  • Loading branch information
j-hall-mwam committed Nov 28, 2023
1 parent 9edd3f4 commit 0f14847
Show file tree
Hide file tree
Showing 26 changed files with 115 additions and 231 deletions.
3 changes: 2 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
version = 3.7.17
runner.dialect = scala3
runner.dialect = scala3
maxColumn = 120
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,38 @@ import sttp.tapir.EndpointIO.annotations.*
import zio.json.{JsonDecoder, JsonEncoder}

trait EndpointUtils {
val apiEndpoint: PublicEndpoint[Unit, Unit, Unit, Any] =
endpoint.in("api" / "v1")
val apiEndpoint: PublicEndpoint[Unit, Unit, Unit, Any] = endpoint.in("api" / "v1")
}

object EndpointUtils {

/** Body codec that will deserialize YAML into domain objects via YAML -> JSON
* -> domain marshalling. This codec does not support encoding responses as
* YAML (since circe-yaml does not support encoding) and YAML response are
* not expected to see use.
/** Body codec that will deserialize YAML into domain objects via YAML -> JSON -> domain marshalling. This codec does
* not support encoding responses as YAML (since circe-yaml does not support encoding) and YAML response are not
* expected to see use.
*
* The Content-Type of YAML requests is associated with "text/plain".
*
* This works by using circe-yaml to convert the YAML input from the request
* body into JSON and using the existing tapir JSON marshalling onwards.
* This works by using circe-yaml to convert the YAML input from the request body into JSON and using the existing
* tapir JSON marshalling onwards.
*
* @tparam T
* Domain object target of the model
* @return
* A body mapping from string to T
*/
def yamlRequestBody[T: JsonEncoder: JsonDecoder: Schema]
: EndpointIO.Body[String, T] = {
def yamlRequestBody[T: JsonEncoder: JsonDecoder: Schema]: EndpointIO.Body[String, T] = {
val jsonDecode = sttp.tapir.json.zio.zioCodec[T]
stringBodyUtf8AnyFormat(
zioYamlCodec.mapDecode(jsonDecode.rawDecode)(jsonDecode.encode)
)
}

private val zioYamlCodec: Codec[String, String, TextPlain] =
sttp.tapir.Codec.anyString[String, TextPlain](CodecFormat.TextPlain()) {
s =>
io.circe.yaml.parser.parse(s).map(_.spaces2) match {
case Left(error) => DecodeResult.Error(s, error)
case Right(jsonString) => DecodeResult.Value(jsonString)
}
sttp.tapir.Codec.anyString[String, TextPlain](CodecFormat.TextPlain()) { s =>
io.circe.yaml.parser.parse(s).map(_.spaces2) match {
case Left(error) => DecodeResult.Error(s, error)
case Right(jsonString) => DecodeResult.Value(jsonString)
}
} { _ =>
throw UnsupportedOperationException("Encoding as YAML is unsupported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@
package com.mwam.kafkakewl.common.kafka

import com.mwam.kafkakewl.domain.config.KafkaClientConfig
import org.apache.kafka.clients.consumer.{
Consumer,
ConsumerRecords,
KafkaConsumer
}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{
BytesDeserializer,
Deserializer,
StringDeserializer
}
import org.apache.kafka.common.serialization.{BytesDeserializer, Deserializer, StringDeserializer}
import org.apache.kafka.common.utils.Bytes
import zio.*

Expand Down Expand Up @@ -92,9 +84,7 @@ object KafkaConsumerExtensions {
if (key == null) {
None
} else {
val value = valueBytesOption.map(valueBytes =>
valueDeserializer.deserialize(topic, valueBytes.get)
)
val value = valueBytesOption.map(valueBytes => valueDeserializer.deserialize(topic, valueBytes.get))
Some((key, value))
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@ final case class BatchMessageEnvelope[Payload](
)

object BatchMessageEnvelopeJson {
import zio.json.{
DeriveJsonDecoder,
DeriveJsonEncoder,
JsonDecoder,
JsonEncoder
}
import zio.json.{DeriveJsonDecoder, DeriveJsonEncoder, JsonDecoder, JsonEncoder}

given [Payload](using
JsonEncoder[Payload]
Expand All @@ -58,9 +53,7 @@ class KafkaPersistentStore(

private object KafkaSerde {
val key: Serde[Any, TopologyId] =
Serde.string.inmapM(s => ZIO.succeed(TopologyId(s)))(topologyId =>
ZIO.succeed(topologyId.value)
)
Serde.string.inmapM(s => ZIO.succeed(TopologyId(s)))(topologyId => ZIO.succeed(topologyId.value))
val value: Serde[Any, BatchMessageEnvelope[TopologyDeployment]] =
Serde.string.inmapM[Any, BatchMessageEnvelope[TopologyDeployment]](s =>
ZIO
Expand All @@ -86,9 +79,8 @@ class KafkaPersistentStore(
(duration, deserializedTopologies) =
deserializedTopologiesWithDuration

failedTopologies = deserializedTopologies.collect {
case (id, Left(jsonError)) =>
ZIO.logError(s"failed to deserialize topology $id: $jsonError")
failedTopologies = deserializedTopologies.collect { case (id, Left(jsonError)) =>
ZIO.logError(s"failed to deserialize topology $id: $jsonError")
}.toList

topologies = deserializedTopologies
Expand All @@ -108,24 +100,23 @@ class KafkaPersistentStore(
for {
txn <- kafkaProducer.createTransaction
batchSize = topologyDeployments.size
_ <- ZIO.foreachDiscard(topologyDeployments.values.zipWithIndex) {
(td, indexInBatch) =>
for {
recordMetadataWithDuration <- txn
.produce(
topicName,
td.topologyId,
BatchMessageEnvelope(batchSize, indexInBatch, td),
KafkaSerde.key,
KafkaSerde.value,
offset = None
)
.timed
(duration, recordMetadata) = recordMetadataWithDuration
_ <- ZIO.logInfo(
s"saved topology ${td.topologyId} into $topicName kafka topic: P#${recordMetadata.partition} @${recordMetadata.offset} in ${duration.toMillis / 1000.0} seconds"
_ <- ZIO.foreachDiscard(topologyDeployments.values.zipWithIndex) { (td, indexInBatch) =>
for {
recordMetadataWithDuration <- txn
.produce(
topicName,
td.topologyId,
BatchMessageEnvelope(batchSize, indexInBatch, td),
KafkaSerde.key,
KafkaSerde.value,
offset = None
)
} yield ()
.timed
(duration, recordMetadata) = recordMetadataWithDuration
_ <- ZIO.logInfo(
s"saved topology ${td.topologyId} into $topicName kafka topic: P#${recordMetadata.partition} @${recordMetadata.offset} in ${duration.toMillis / 1000.0} seconds"
)
} yield ()
}
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
package com.mwam.kafkakewl.deploy

import com.mwam.kafkakewl.common.http.HttpServer
import com.mwam.kafkakewl.common.persistence.{
KafkaPersistentStore,
PersistentStore
}
import com.mwam.kafkakewl.common.persistence.{KafkaPersistentStore, PersistentStore}
import com.mwam.kafkakewl.common.telemetry.GlobalTracer
import com.mwam.kafkakewl.deploy.endpoints.*
import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ class DeploymentsEndpoints() extends EndpointUtils {
.errorOut(queryDeploymentsFailureOutput)
.out(jsonBody[TopologyDeployment])

val getDeploymentsEndpoint
: PublicEndpoint[TopologyDeploymentQuery, QueryDeploymentsFailure, Seq[
TopologyDeployment
], Any] = deploymentsEndpoint
val getDeploymentsEndpoint: PublicEndpoint[TopologyDeploymentQuery, QueryDeploymentsFailure, Seq[
TopologyDeployment
], Any] = deploymentsEndpoint
.in(query[Option[String]]("filter"))
.in(query[Option[Boolean]]("with_topology"))
.in(query[Option[Int]]("offset"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ class TopologyDeploymentsService private (
)
)
.toMap ++ deployments.delete
.map(tid =>
(tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None))
)
.map(tid => (tid, TopologyDeployment(tid, TopologyDeploymentStatus(), None)))
.toMap

_ <- persistentStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ final case class DeploymentOptions(
allowUnsafe: Boolean = false
)

/** A deployment to be done, contains options, topologies to be deployed and
* topology-ids to be removed.
/** A deployment to be done, contains options, topologies to be deployed and topology-ids to be removed.
*
* @param options
* the deployment options
Expand Down Expand Up @@ -68,17 +67,13 @@ sealed trait PostDeploymentsFailure
/** All failures relating to performing/querying deployments.
*/
object DeploymentsFailure {
final case class NotFound(notFound: Seq[String])
extends QueryDeploymentsFailure
final case class NotFound(notFound: Seq[String]) extends QueryDeploymentsFailure
final case class Authorization(authorizationFailed: Seq[String])
extends PostDeploymentsFailure
with QueryDeploymentsFailure
final case class Validation(validationFailed: Seq[String])
extends PostDeploymentsFailure
final case class Deployment(deploymentFailed: Seq[String])
extends PostDeploymentsFailure
final case class Persistence(persistFailed: Seq[String])
extends PostDeploymentsFailure
final case class Validation(validationFailed: Seq[String]) extends PostDeploymentsFailure
final case class Deployment(deploymentFailed: Seq[String]) extends PostDeploymentsFailure
final case class Persistence(persistFailed: Seq[String]) extends PostDeploymentsFailure

def notFound(notFound: String*): NotFound = NotFound(notFound)
def authorization(throwable: Throwable): Authorization = Authorization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,28 @@ final case class Topic(

/** The local application id in the current topology's namespace.
*/
final case class ApplicationLocalId(value: String)
extends AnyVal
with StringValue
final case class ApplicationLocalId(value: String) extends AnyVal with StringValue
final case class UserId(value: String) extends AnyVal with StringValue
final case class Application(
id: ApplicationLocalId,
user: UserId
// TODO different application types
)

/** The local topic alias id in the current topology's namespace (fully
* qualified isn't really needed anyway because aliases aren't currently
* exposed to other topologies).
/** The local topic alias id in the current topology's namespace (fully qualified isn't really needed anyway because
* aliases aren't currently exposed to other topologies).
*/
final case class TopicAliasLocalId(value: String)
extends AnyVal
with StringValue
final case class TopicAliasLocalId(value: String) extends AnyVal with StringValue
final case class TopicAlias(
id: TopicAliasLocalId,
// TODO perhaps support other ways, e.g. list of topic ids, namespace? Although all these are expressible with regex easily
regex: String
)

/** The local application alias id in the current topology's namespace (fully
* qualified isn't really needed anyway because aliases aren't currently
* exposed to other topologies).
/** The local application alias id in the current topology's namespace (fully qualified isn't really needed anyway
* because aliases aren't currently exposed to other topologies).
*/
final case class ApplicationAliasLocalId(value: String)
extends AnyVal
with StringValue
final case class ApplicationAliasLocalId(value: String) extends AnyVal with StringValue
final case class ApplicationAlias(
id: ApplicationAliasLocalId,
// TODO perhaps support other ways, e.g. list of topic ids, namespace? Although all these are expressible with regex easily
Expand All @@ -64,15 +56,11 @@ final case class Aliases(
applications: Seq[ApplicationAlias] = Seq.empty
)

/** ApplicationFlexId can be an application alias or application id, local or
* fully qualified.
/** ApplicationFlexId can be an application alias or application id, local or fully qualified.
*/
final case class ApplicationFlexId(value: String)
extends AnyVal
with StringValue
final case class ApplicationFlexId(value: String) extends AnyVal with StringValue

/** TopicFlexId can be a local or fully qualified topic alias or fully qualified
* topic id.
/** TopicFlexId can be a local or fully qualified topic alias or fully qualified topic id.
*/
final case class TopicFlexId(value: String) extends AnyVal with StringValue
final case class ProducedTopic(topic: TopicFlexId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@

package com.mwam.kafkakewl.domain

import zio.json.{
DeriveJsonDecoder,
DeriveJsonEncoder,
JsonDecoder,
JsonEncoder,
JsonFieldDecoder,
JsonFieldEncoder
}
import zio.json.{DeriveJsonDecoder, DeriveJsonEncoder, JsonDecoder, JsonEncoder, JsonFieldDecoder, JsonFieldEncoder}

object TopologyJson {
given [T <: StringValue]: JsonEncoder[T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@

package com.mwam.kafkakewl.domain.validation

import com.mwam.kafkakewl.domain.{
Deployments,
Namespace,
Topology,
TopologyDeployments,
TopologyId
}
import com.mwam.kafkakewl.domain.{Deployments, Namespace, Topology, TopologyDeployments, TopologyId}

import scala.util.matching.Regex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import scala.annotation.targetName
package object validation {
type ValidationError = String

/** The validation does not produce anything, because the input is already a
* domain object that we're checking whether it's valid or not.
/** The validation does not produce anything, because the input is already a domain object that we're checking whether
* it's valid or not.
*
* TODO Maybe an abuse of the Validation[E, A] type? Ultimately all we need
* is a list of errors, when empty it means no errors.
* TODO Maybe an abuse of the Validation[E, A] type? Ultimately all we need is a list of errors, when empty it means
* no errors.
*/
type ValidationFailures = Validation[ValidationError, Unit]

Expand All @@ -37,8 +37,8 @@ package object validation {
validationFailures1: ValidationFailures,
validationFailures2: ValidationFailures
): ValidationFailures =
Validation.validateWith(validationFailures1, validationFailures2) {
(_, _) => successValue
Validation.validateWith(validationFailures1, validationFailures2) { (_, _) =>
successValue
}

extension (validationFailures: ValidationFailures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ sealed trait QueryFailure

object Failures {
final case class NotFound(notFound: Seq[String]) extends QueryFailure
final case class Authorization(authorizationFailed: Seq[String])
extends QueryFailure
final case class Authorization(authorizationFailed: Seq[String]) extends QueryFailure

def notFound(notFound: String*): NotFound = NotFound(notFound)
def authorization(throwable: Throwable): Authorization = Authorization(
Expand Down
Loading

0 comments on commit 0f14847

Please sign in to comment.