Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/kafka): Add optional externalURL base for link to external platform #12675

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 11 additions & 67 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import avro.schema
import confluent_kafka
import confluent_kafka.admin
import pydantic
from confluent_kafka.admin import (
AdminClient,
ConfigEntry,
Expand All @@ -16,13 +15,8 @@
)
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.kafka_consumer_config import CallableConsumerConfig
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -50,16 +44,15 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.kafka.kafka_config import KafkaSourceConfig
from datahub.ingestion.source.kafka.kafka_schema_registry_base import (
KafkaSchemaRegistryBase,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
Expand Down Expand Up @@ -90,64 +83,6 @@
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
domain: Dict[str, AllowDenyPattern] = pydantic.Field(
default={},
description="A map of domain names to allow deny patterns. Domains can be urn-based (`urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810`) or bare (`13ae4d85-d955-49fc-8474-9004c663a810`).",
)
topic_subject_map: Dict[str, str] = pydantic.Field(
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = pydantic.Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
)
schema_tags_field: str = pydantic.Field(
default="tags",
description="The field name in the schema metadata that contains the tags to be added to the dataset.",
)
enable_meta_mapping: bool = pydantic.Field(
default=True,
description="When enabled, applies the mappings that are defined through the meta_mapping directives.",
)
meta_mapping: Dict = pydantic.Field(
default={},
description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.",
)
field_meta_mapping: Dict = pydantic.Field(
default={},
description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.",
)
strip_user_ids_from_email: bool = pydantic.Field(
default=False,
description="Whether or not to strip email id while adding owners using meta mappings.",
)
tag_prefix: str = pydantic.Field(
default="", description="Prefix added to tags during ingestion."
)
ignore_warnings_on_schema_type: bool = pydantic.Field(
default=False,
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
)
disable_topic_record_naming_strategy: bool = pydantic.Field(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = pydantic.Field(
default=False,
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
)


def get_kafka_consumer(
connection: KafkaConsumerConnectionConfig,
) -> confluent_kafka.Consumer:
Expand Down Expand Up @@ -430,6 +365,7 @@

# 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro
description: Optional[str] = None
external_url: Optional[str] = None
if (
schema_metadata is not None
and isinstance(schema_metadata.platformSchema, KafkaSchemaClass)
Expand Down Expand Up @@ -481,8 +417,16 @@
mce_builder.make_global_tag_aspect_with_tag_list(all_tags)
)

if self.source_config.external_url_base:
# Remove trailing slash from base URL if present
base_url = self.source_config.external_url_base.rstrip("/")
external_url = f"{base_url}/{dataset_name}"

Check warning on line 423 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L422-L423

Added lines #L422 - L423 were not covered by tests

dataset_properties = DatasetPropertiesClass(
name=dataset_name, customProperties=custom_props, description=description
name=dataset_name,
customProperties=custom_props,
description=description,
externalUrl=external_url,
)
dataset_snapshot.aspects.append(dataset_properties)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Dict, Optional

from pydantic import Field

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)


class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
domain: Dict[str, AllowDenyPattern] = Field(
default={},
description="A map of domain names to allow deny patterns. Domains can be urn-based (`urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810`) or bare (`13ae4d85-d955-49fc-8474-9004c663a810`).",
)
topic_subject_map: Dict[str, str] = Field(
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
)
schema_tags_field: str = Field(
default="tags",
description="The field name in the schema metadata that contains the tags to be added to the dataset.",
)
enable_meta_mapping: bool = Field(
default=True,
description="When enabled, applies the mappings that are defined through the meta_mapping directives.",
)
meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.",
)
field_meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.",
)
strip_user_ids_from_email: bool = Field(
default=False,
description="Whether or not to strip email id while adding owners using meta mappings.",
)
tag_prefix: str = Field(
default="", description="Prefix added to tags during ingestion."
)
ignore_warnings_on_schema_type: bool = Field(
default=False,
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
)
disable_topic_record_naming_strategy: bool = Field(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = Field(
default=False,
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
)
external_url_base: Optional[str] = Field(
default=None,
description="Base URL for external platform (e.g. Aiven) where topics can be viewed. The topic name will be appended to this base URL.",
)
Loading