Skip to content

Commit

Permalink
Functionality to add arbitrary metadata to AVRO JSON formatted schemas (
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnPreston authored Jul 9, 2024
1 parent 3653ca7 commit 9d207eb
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 76 deletions.
29 changes: 29 additions & 0 deletions cfn_kafka_admin/cfn_kafka_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,35 @@ def define_schema_definition_path(
definition = attribute.Definition
file_name = f"{topic_name}{SerializerDef[attribute.Serializer.name].value}{subject_suffix}Schema.json"

# Allows to insert metadata defined at the global or schema level automatically.
# This avoids having to edit all schemas one by one to edit properties.
# Only doing this for AVRO.
if attribute.Serializer == SerializerDef.AVRO:
try:
definition_dict = json.loads(definition)
if self.model.Schemas.Metadata and not attribute.Metadata:
to_add_metadata: dict = deepcopy(self.model.Schemas.Metadata)
elif not self.model.Schemas.Metadata and attribute.Metadata:
to_add_metadata: dict = deepcopy(attribute.Metadata)
elif self.model.Schemas.Metadata and attribute.Metadata:
from .common import recursive_merge

to_add_metadata = recursive_merge(
self.model.Schemas.Metadata, attribute.Metadata
)
else:
to_add_metadata: dict = {}
for key in to_add_metadata.keys():
if key not in definition_dict:
definition_dict.update(self.model.Schemas.Metadata)
print(f"Updated {file_name} with {key} metadata")
else:
print(f"Metadata {key} already defined in source schema.")
definition: str = json.dumps(definition_dict)
except json.JSONDecodeError:
print(f"Schema {file_name} is not a valid JSON. Not adding metadata.")
pass

if self.model.Schemas.S3Store and file_name:
s3_file_path = (
f"{self.model.Schemas.S3Store.PrefixPath}{FILE_PREFIX}{file_name}"
Expand Down
11 changes: 11 additions & 0 deletions cfn_kafka_admin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,14 @@ def setup_logging(logger_name: str = "kafka"):


KAFKA_LOG = setup_logging()


def recursive_merge(dict1, dict2):
for key, value in dict2.items():
if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict):
# Recursively merge nested dictionaries
dict1[key] = recursive_merge(dict1[key], value)
else:
# Merge non-dictionary values
dict1[key] = value
return dict1
156 changes: 81 additions & 75 deletions cfn_kafka_admin/models/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Config:
BucketName: constr(
regex=r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]{0,61}[A-Za-z0-9])\Z"
)
PrefixPath: Optional[constr(regex=r"^[^/](.*)/$")] = ""
PrefixPath: constr(regex=r"^[^/](.*)/$") | None = ""


class CompatibilityMode(Enum):
Expand All @@ -51,8 +51,12 @@ class SerializerDef(Enum):

class TopicSchemaDef(BaseModel):
Serializer: SerializerDef
Definition: Optional[Union[str, Dict[str, Any]]] = None
CompatibilityMode: Optional[CompatibilityMode] = "NONE"
Definition: str | dict[str, Any] | None = None
CompatibilityMode: CompatibilityMode | None = "NONE"
Metadata: dict[str, Any] | None = None
"""
Metadata to automatically add to AVRO schemas.
"""


class BootstrapServers(BaseModel):
Expand Down Expand Up @@ -158,10 +162,10 @@ class DeletionPolicy2(Enum):


class TopicSchemas(BaseModel):
Key: Optional[TopicSchemaDef] = None
Value: Optional[TopicSchemaDef] = None
Header: Optional[TopicSchemaDef] = None
DeletionPolicy: Optional[DeletionPolicy2] = "Retain"
Key: TopicSchemaDef | None = None
Value: TopicSchemaDef | None = None
Header: TopicSchemaDef | None = None
DeletionPolicy: DeletionPolicy2 | None = "Retain"
"""
When set, overrides the DeletionPolicy set on the Topic of the schema. For safety, defaulting to Retain
"""
Expand Down Expand Up @@ -191,83 +195,81 @@ class CompressionType(Enum):


class TopicsSettings(BaseModel):
cleanup_policy: Optional[CleanupPolicy] = Field("delete", alias="cleanup.policy")
cleanup_policy: CleanupPolicy | None = Field("delete", alias="cleanup.policy")
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_cleanup.policy
"""
compression_type: Optional[CompressionType] = Field(None, alias="compression.type")
compression_type: CompressionType | None = Field(None, alias="compression.type")
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_compression.type
"""
delete_retention_ms: Optional[conint(ge=0)] = Field(
delete_retention_ms: conint(ge=0) | None = Field(
86400000, alias="delete.retention.ms"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_delete.retention.ms
"""
file_delete_delay_ms: Optional[conint(ge=0)] = Field(
file_delete_delay_ms: conint(ge=0) | None = Field(
60000, alias="file.delete.delay.ms"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_file.delete.delay.ms
"""
retention_ms: Optional[conint(ge=-1)] = Field(604800000, alias="retention.ms")
retention_ms: conint(ge=-1) | None = Field(604800000, alias="retention.ms")
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_retention.ms
"""
flush_messages: Optional[conint(ge=0)] = Field(
flush_messages: conint(ge=0) | None = Field(
9223372036854775807, alias="flush.messages"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_flush.messages
"""
flush_ms: Optional[conint(ge=0)] = Field(9223372036854775807, alias="flush.ms")
flush_ms: conint(ge=0) | None = Field(9223372036854775807, alias="flush.ms")
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_flush.ms
"""
index_interval_bytes: Optional[conint(ge=0)] = Field(
index_interval_bytes: conint(ge=0) | None = Field(
4096, alias="index.interval.bytes"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_index.interval.bytes
"""
max_compaction_lag_ms: Optional[conint(ge=0)] = Field(
max_compaction_lag_ms: conint(ge=0) | None = Field(
9223372036854775807, alias="max.compaction.lag.ms"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.compaction.lag.ms
"""
min_compaction_lag_ms: Optional[conint(ge=0)] = Field(
min_compaction_lag_ms: conint(ge=0) | None = Field(
None, alias="min.compaction.lag.ms"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_min.compaction.lag.ms
"""
min_cleanable_dirty_ratio: Optional[confloat(ge=0.0, le=1.0)] = Field(
min_cleanable_dirty_ratio: confloat(ge=0.0, le=1.0) | None = Field(
0.5, alias="min.cleanable.dirty.ratio"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_min.cleanable.dirty.ratio
"""
max_message_bytes: Optional[conint(ge=0)] = Field(
1048588, alias="max.message.bytes"
)
max_message_bytes: conint(ge=0) | None = Field(1048588, alias="max.message.bytes")
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_max.message.bytes
"""
confluent_key_schema_validation: Optional[bool] = Field(
confluent_key_schema_validation: bool | None = Field(
None, alias="confluent.key.schema.validation"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_confluent.key.schema.validation
"""
confluent_key_subject_name_strategy: Optional[str] = Field(
confluent_key_subject_name_strategy: str | None = Field(
None, alias="confluent.key.subject.name.strategy"
)
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_confluent.key.subject.name.strategy
"""
confluent_tier_enable: Optional[str] = Field(None, alias="confluent.tier.enable")
confluent_tier_enable: str | None = Field(None, alias="confluent.tier.enable")
"""
https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_confluent.tier.enable
"""
Expand Down Expand Up @@ -330,7 +332,7 @@ class Config:
"""
Name of the resource to apply the ACL for
"""
PatternType: Optional[PatternType] = "LITERAL"
PatternType: PatternType | None = "LITERAL"
"""
Pattern type for resource value
"""
Expand All @@ -350,7 +352,7 @@ class Config:
"""
Effect for the ACL.
"""
Host: Optional[str] = "*"
Host: str | None = "*"
"""
Specify the host for the ACL. Defaults to '*'
"""
Expand All @@ -361,19 +363,19 @@ class EwsKafkaSchema(BaseModel):
Resource to create Kafka topics in your cluster.
"""

RegistryUrl: Optional[RegistryUrl] = None
RegistryUsername: Optional[RegistryUsername] = None
RegistryPassword: Optional[RegistryPassword] = None
RegistryUserInfo: Optional[RegistryUserInfo] = None
Subject: Optional[str] = None
Serializer: Optional[SerializerDef] = None
Definition: Optional[Union[str, Dict[str, Any]]] = None
CompatibilityMode: Optional[CompatibilityMode] = "NONE"
ServiceToken: Optional[str] = None
RegistryUrl: RegistryUrl | None = None
RegistryUsername: RegistryUsername | None = None
RegistryPassword: RegistryPassword | None = None
RegistryUserInfo: RegistryUserInfo | None = None
Subject: str | None = None
Serializer: SerializerDef | None = None
Definition: str | dict[str, Any] | None = None
CompatibilityMode: CompatibilityMode | None = "NONE"
ServiceToken: str | None = None
"""
The Lambda Function ARN
"""
PermanentlyDelete: Optional[bool] = False
PermanentlyDelete: bool | None = False
"""
If set to true, the Schema is set to hard delete. Use carefully
"""
Expand All @@ -384,41 +386,45 @@ class EwsKafkaAcl(BaseModel):
Resource to create Kafka topics in your cluster.
"""

Policies: Optional[List[PolicyDef]] = Field(None, unique_items=True)
Policies: list[PolicyDef] | None = Field(None, unique_items=True)


class Schemas(BaseModel):
FunctionName: Optional[str] = None
FunctionName: str | None = None
"""
Name or ARN of the Schema Registry function to use
"""
RegistryUrl: Optional[str] = None
RegistryUsername: Optional[RegistryUsername] = None
RegistryPassword: Optional[RegistryPassword] = None
RegistryUserInfo: Optional[RegistryUserInfo] = None
CompatibilityMode: Optional[CompatibilityMode] = "NONE"
DeletionPolicy: Optional[DeletionPolicy1] = "Retain"
RegistryUrl: str | None = None
RegistryUsername: RegistryUsername | None = None
RegistryPassword: RegistryPassword | None = None
RegistryUserInfo: RegistryUserInfo | None = None
CompatibilityMode: CompatibilityMode | None = "NONE"
DeletionPolicy: DeletionPolicy1 | None = "Retain"
"""
When set, overrides the DeletionPolicy set as default for all schemas
"""
S3Store: Optional[S3Store] = None
S3Store: S3Store | None = None
Metadata: dict[str, Any] | None = None
"""
Metadata to automatically add to AVRO schemas.
"""


class EwsKafkaParameters(BaseModel):
"""
Generic properties used to connect to Kafka cluster.
"""

BootstrapServers: Optional[BootstrapServers] = None
SecurityProtocol: Optional[SecurityProtocol] = "PLAINTEXT"
SASLMechanism: Optional[SASLMechanism] = "PLAIN"
SASLUsername: Optional[SASLUsername] = None
SASLPassword: Optional[SASLPassword] = None
RegistryUrl: Optional[RegistryUrl] = None
RegistryUsername: Optional[RegistryUsername] = None
RegistryPassword: Optional[RegistryPassword] = None
CompatibilityMode: Optional[CompatibilityMode] = "NONE"
ClientConfig: Optional[Dict[str, Any]] = None
BootstrapServers: BootstrapServers | None = None
SecurityProtocol: SecurityProtocol | None = "PLAINTEXT"
SASLMechanism: SASLMechanism | None = "PLAIN"
SASLUsername: SASLUsername | None = None
SASLPassword: SASLPassword | None = None
RegistryUrl: RegistryUrl | None = None
RegistryUsername: RegistryUsername | None = None
RegistryPassword: RegistryPassword | None = None
CompatibilityMode: CompatibilityMode | None = "NONE"
ClientConfig: dict[str, Any] | None = None
"""
Client configuration as per the librdkafka settings. Incompatible with the SASL & SecurityProtocol properties
"""
Expand All @@ -431,22 +437,22 @@ class EwsKafkaTopic(BaseModel):

Name: Name
PartitionsCount: PartitionsCount
ReplicationFactor: Optional[ReplicationFactor] = None
BootstrapServers: Optional[BootstrapServers] = None
SecurityProtocol: Optional[SecurityProtocol] = "PLAINTEXT"
SASLMechanism: Optional[SASLMechanism] = "PLAIN"
SASLUsername: Optional[SASLUsername] = None
SASLPassword: Optional[SASLPassword] = None
Schema: Optional[TopicSchemas] = None
Settings: Optional[TopicsSettings] = None
ClientConfig: Optional[Dict[str, Any]] = None
ReplicationFactor: ReplicationFactor | None = None
BootstrapServers: BootstrapServers | None = None
SecurityProtocol: SecurityProtocol | None = "PLAINTEXT"
SASLMechanism: SASLMechanism | None = "PLAIN"
SASLUsername: SASLUsername | None = None
SASLPassword: SASLPassword | None = None
Schema: TopicSchemas | None = None
Settings: TopicsSettings | None = None
ClientConfig: dict[str, Any] | None = None
"""
Client configuration as per the librdkafka settings.
"""


class Policies(BaseModel):
__root__: List[PolicyDef] = Field(..., unique_items=True)
__root__: list[PolicyDef] = Field(..., unique_items=True)


class SchemasDef(BaseModel):
Expand All @@ -458,29 +464,29 @@ class AclsModel(BaseModel):


class Topics(BaseModel):
Topics: Optional[List[EwsKafkaTopic]] = None
ReplicationFactor: Optional[ReplicationFactor] = None
FunctionName: Optional[str] = None
Topics: list[EwsKafkaTopic] | None = None
ReplicationFactor: ReplicationFactor | None = None
FunctionName: str | None = None
"""
Name or ARN of the Lambda function to use for Custom::KafkaTopic
"""
DeletionPolicy: Optional[DeletionPolicy] = "Retain"
ImportExisting: Optional[bool] = True
DeletionPolicy: DeletionPolicy | None = "Retain"
ImportExisting: bool | None = True
"""
Whether to import existing topics on Create. Fails if set to false
"""


class ACLs(BaseModel):
Policies: Optional[Policies] = None
FunctionName: Optional[str] = None
Policies: Policies | None = None
FunctionName: str | None = None
"""
Name or ARN of the Lambda function to use for Custom::KafkaACL
"""


class Model(BaseModel):
Globals: EwsKafkaParameters
Topics: Optional[Topics] = None
ACLs: Optional[ACLs] = None
Schemas: Optional[Schemas] = None
Topics: Topics | None = None
ACLs: ACLs | None = None
Schemas: Schemas | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
"S3Store": {
"type": "object",
"additionalProperties": false,
"required": ["BucketName"],
"required": [
"BucketName"
],
"properties": {
"BucketName": {
"type": "string",
Expand All @@ -98,6 +100,10 @@
"pattern": "^[^/](.*)\/$"
}
}
},
"Metadata": {
"description": "Metadata to automatically add to AVRO schemas.",
"type": "object"
}
}
}
Expand Down
Loading

0 comments on commit 9d207eb

Please sign in to comment.