Skip to content

Commit

Permalink
add externalURL
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Feb 19, 2025
1 parent 48f82a4 commit e9750a3
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 67 deletions.
78 changes: 11 additions & 67 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import avro.schema
import confluent_kafka
import confluent_kafka.admin
import pydantic
from confluent_kafka.admin import (
AdminClient,
ConfigEntry,
Expand All @@ -16,13 +15,8 @@
)
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.kafka_consumer_config import CallableConsumerConfig
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -50,16 +44,15 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.kafka.kafka_config import KafkaSourceConfig
from datahub.ingestion.source.kafka.kafka_schema_registry_base import (
KafkaSchemaRegistryBase,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
Expand Down Expand Up @@ -90,64 +83,6 @@ class KafkaTopicConfigKeys(StrEnum):
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
domain: Dict[str, AllowDenyPattern] = pydantic.Field(
default={},
description="A map of domain names to allow deny patterns. Domains can be urn-based (`urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810`) or bare (`13ae4d85-d955-49fc-8474-9004c663a810`).",
)
topic_subject_map: Dict[str, str] = pydantic.Field(
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = pydantic.Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
)
schema_tags_field: str = pydantic.Field(
default="tags",
description="The field name in the schema metadata that contains the tags to be added to the dataset.",
)
enable_meta_mapping: bool = pydantic.Field(
default=True,
description="When enabled, applies the mappings that are defined through the meta_mapping directives.",
)
meta_mapping: Dict = pydantic.Field(
default={},
description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.",
)
field_meta_mapping: Dict = pydantic.Field(
default={},
description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.",
)
strip_user_ids_from_email: bool = pydantic.Field(
default=False,
description="Whether or not to strip email id while adding owners using meta mappings.",
)
tag_prefix: str = pydantic.Field(
default="", description="Prefix added to tags during ingestion."
)
ignore_warnings_on_schema_type: bool = pydantic.Field(
default=False,
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
)
disable_topic_record_naming_strategy: bool = pydantic.Field(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = pydantic.Field(
default=False,
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
)


def get_kafka_consumer(
connection: KafkaConsumerConnectionConfig,
) -> confluent_kafka.Consumer:
Expand Down Expand Up @@ -430,6 +365,7 @@ def _extract_record(

# 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
description: Optional[str] = None
external_url: Optional[str] = None
if (
schema_metadata is not None
and isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
Expand Down Expand Up @@ -481,8 +417,16 @@ def _extract_record(
mce_builder.make_global_tag_aspect_with_tag_list(all_tags)
)

if self.source_config.external_url_base:
# Remove trailing slash from base URL if present
base_url = self.source_config.external_url_base.rstrip("/")
external_url = f"{base_url}/{dataset_name}"

Check warning on line 423 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L422-L423

Added lines #L422 - L423 were not covered by tests

dataset_properties = DatasetPropertiesClass(
name=dataset_name, customProperties=custom_props, description=description
name=dataset_name,
customProperties=custom_props,
description=description,
externalUrl=external_url,
)
dataset_snapshot.aspects.append(dataset_properties)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Dict, Optional

from pydantic import Field

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)


class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
domain: Dict[str, AllowDenyPattern] = Field(
default={},
description="A map of domain names to allow deny patterns. Domains can be urn-based (`urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810`) or bare (`13ae4d85-d955-49fc-8474-9004c663a810`).",
)
topic_subject_map: Dict[str, str] = Field(
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
)
schema_tags_field: str = Field(
default="tags",
description="The field name in the schema metadata that contains the tags to be added to the dataset.",
)
enable_meta_mapping: bool = Field(
default=True,
description="When enabled, applies the mappings that are defined through the meta_mapping directives.",
)
meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.",
)
field_meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.",
)
strip_user_ids_from_email: bool = Field(
default=False,
description="Whether or not to strip email id while adding owners using meta mappings.",
)
tag_prefix: str = Field(
default="", description="Prefix added to tags during ingestion."
)
ignore_warnings_on_schema_type: bool = Field(
default=False,
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
)
disable_topic_record_naming_strategy: bool = Field(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = Field(
default=False,
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
)
external_url_base: Optional[str] = Field(
default=None,
description="Base URL for external platform (e.g. Aiven) where topics can be viewed. The topic name will be appended to this base URL.",
)

0 comments on commit e9750a3

Please sign in to comment.