Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(airflow): added support for jinja template for datahub emitter operator #11300

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from avrogen.dict_wrapper import DictWrapper
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent

from datahub_airflow_plugin.hooks.datahub import (
Expand Down Expand Up @@ -45,11 +47,13 @@ class DatahubEmitterOperator(DatahubBaseOperator):
:type datahub_conn_id: str
"""

template_fields = ["metadata"]

# See above for why these mypy type issues are ignored here.
@apply_defaults # type: ignore[misc]
def __init__( # type: ignore[no-untyped-def]
self,
mces: List[MetadataChangeEvent],
mces: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]],
datahub_conn_id: str,
**kwargs,
):
Expand All @@ -59,5 +63,50 @@ def __init__( # type: ignore[no-untyped-def]
)
self.metadata = mces

def _render_template_fields(self, field_value, context, jinja_env):
if isinstance(field_value, DictWrapper):
for key, value in field_value.items():
setattr(
field_value,
key,
self._render_template_fields(value, context, jinja_env),
)
elif isinstance(field_value, list):
for item in field_value:
self._render_template_fields(item, context, jinja_env)
elif isinstance(field_value, str):
return super().render_template(field_value, context, jinja_env)
Comment on lines +77 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elif isinstance(field_value, str):
return super().render_template(field_value, context, jinja_env)

else:
return super().render_template(field_value, context, jinja_env)
return field_value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we calling super(). _render_template_fields?


def execute(self, context):
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
if context:
jinja_env = self.get_template_env()

"""
The `_render_template_fields` method is called in the `execute` method to ensure that all template fields
are rendered with the current execution context, which includes runtime variables and other dynamic data,
is only available during the execution of the task.

The `render_template` method is not overridden because the `_render_template_fields` method is used to
handle the rendering of template fields recursively.
This approach allows for more granular control over how each field is rendered,
especially when dealing with complex data structures like `DictWrapper` and lists.

By not overriding `render_template`, the code leverages the existing functionality
provided by the base class while adding custom logic for specific cases.
"""
for item in self.metadata:
if isinstance(item, MetadataChangeProposalWrapper):
for key in item.__dict__.keys():
value = getattr(item, key)
setattr(
item,
key,
self._render_template_fields(value, context, jinja_env),
)
if isinstance(item, MetadataChangeEvent):
self._render_template_fields(item, context, jinja_env)
dushayntAW marked this conversation as resolved.
Show resolved Hide resolved

self.generic_hook.get_underlying_hook().emit(self.metadata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from datetime import datetime, timedelta

from airflow import DAG
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsV2Class,
DatasetPropertiesClass,
DatasetSnapshotClass,
)

from datahub_airflow_plugin.operators.datahub import DatahubEmitterOperator

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"execution_timeout": timedelta(minutes=5),
}


with DAG(
"datahub_emitter_operator_jinja_template_dag",
default_args=default_args,
description="An example dag with jinja template",
schedule_interval=None,
tags=["example_tag"],
catchup=False,
default_view="tree",
):
add_custom_properties = DatahubEmitterOperator(
task_id="datahub_emitter_operator_jinja_template_dag_task",
mces=[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.mcpw_example,DEV)",
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass("mcpw_example {{ ds }}")],
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.mcpw_example_{{ ts_nodash }},DEV)",
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass("mcpw_example {{ ds }}")],
),
),
MetadataChangeEvent(
proposedSnapshot=DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.lineage_example,DEV)",
aspects=[
DatasetPropertiesClass(
customProperties={"jinjaTemplate": "{{ ds }}"}
)
],
),
),
MetadataChangeEvent(
proposedSnapshot=DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.lineage_example_{{ ts_nodash }},DEV)",
dushayntAW marked this conversation as resolved.
Show resolved Hide resolved
aspects=[
DatasetPropertiesClass(
customProperties={"jinjaTemplate": "{{ ds }}"}
)
],
),
),
],
datahub_conn_id="datahub_file_default",
)
Loading
Loading