Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/airflow): Add support for mutiple datahub emitter #12398

Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from enum import Enum
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from airflow.configuration import conf
from pydantic import root_validator
from pydantic.fields import Field

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern, ConfigModel

if TYPE_CHECKING:
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
from datahub_airflow_plugin.hooks.datahub import (
DatahubCompositeHook,
DatahubGenericHook,
)


class DatajobUrl(Enum):
Expand All @@ -27,6 +31,8 @@
# DataHub hook connection ID.
datahub_conn_id: str

_datahub_connection_ids: List[str]

# Cluster to associate with the pipelines and tasks. Defaults to "prod".
cluster: str = builder.DEFAULT_FLOW_CLUSTER

Expand Down Expand Up @@ -68,11 +74,25 @@

disable_openlineage_plugin: bool = True

def make_emitter_hook(self) -> "DatahubGenericHook":
def make_emitter_hook(self) -> Union["DatahubGenericHook", "DatahubCompositeHook"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought, not necessary to do: should the DatahubGenericHook be the thing responsible for constructing the composite emitter?

e.g. you can pass it either a string or a list of strings, and it does the right thing

# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook

return DatahubGenericHook(self.datahub_conn_id)
from datahub_airflow_plugin.hooks.datahub import (
DatahubCompositeHook,
DatahubGenericHook,
)

if len(self._datahub_connection_ids) == 1:
return DatahubGenericHook(self._datahub_connection_ids[0])
else:
return DatahubCompositeHook(self._datahub_connection_ids)

@root_validator(skip_on_failure=True)
def split_conn_ids(cls, values: Dict) -> Dict:
if not values.get("datahub_conn_id"):
raise ValueError("datahub_conn_id is required")

Check warning on line 92 in metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py#L92

Added line #L92 was not covered by tests
conn_ids = values.get("datahub_conn_id", "").split(",")
cls._datahub_connection_ids = [conn_id.strip() for conn_id in conn_ids]
return values


def get_lineage_config() -> DatahubLineageConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
TASK_ON_RETRY_CALLBACK = "on_retry_callback"


def load_config_v22():
plugin_config = get_lineage_config()
return plugin_config


def get_task_inlets_advanced(task: BaseOperator, context: Any) -> Iterable[Any]:
# TODO: Fix for https://github.com/apache/airflow/commit/1b1f3fabc5909a447a6277cafef3a0d4ef1f01ae
# in Airflow 2.4.
Expand Down Expand Up @@ -99,9 +104,7 @@
task_inlets = get_task_inlets_advanced(task, context)
task_outlets = get_task_outlets(task)

emitter = (
DatahubGenericHook(config.datahub_conn_id).get_underlying_hook().make_emitter()
)
emitter = config.make_emitter_hook().make_emitter()

dataflow = AirflowGenerator.generate_dataflow(
config=config,
Expand Down Expand Up @@ -217,7 +220,7 @@

def _wrap_pre_execution(pre_execution):
def custom_pre_execution(context):
config = get_lineage_config()
config = load_config_v22()

Check warning on line 223 in metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py#L223

Added line #L223 was not covered by tests
if config.enabled:
context["_datahub_config"] = config
datahub_pre_execution(context)
Expand All @@ -231,7 +234,7 @@

def _wrap_on_failure_callback(on_failure_callback):
def custom_on_failure_callback(context):
config = get_lineage_config()
config = load_config_v22()

Check warning on line 237 in metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py#L237

Added line #L237 was not covered by tests
if config.enabled:
context["_datahub_config"] = config
try:
Expand All @@ -251,7 +254,7 @@

def _wrap_on_success_callback(on_success_callback):
def custom_on_success_callback(context):
config = get_lineage_config()
config = load_config_v22()
if config.enabled:
context["_datahub_config"] = config
try:
Expand All @@ -271,7 +274,8 @@

def _wrap_on_retry_callback(on_retry_callback):
def custom_on_retry_callback(context):
config = get_lineage_config()
config = load_config_v22()

Check warning on line 277 in metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py#L277

Added line #L277 was not covered by tests

if config.enabled:
context["_datahub_config"] = config
try:
Expand Down Expand Up @@ -363,7 +367,7 @@

_patch_policy(settings)

plugin_config = get_lineage_config()
plugin_config = load_config_v22()
telemetry.telemetry_instance.ping(
"airflow-plugin-init",
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, Union

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook

from datahub.emitter.composite_emitter import CompositeEmitter
from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
Expand Down Expand Up @@ -303,3 +304,45 @@

# Retained for backwards compatibility.
emit_mces = emit


class DatahubCompositeHook(BaseHook):
"""
A hook that can emit metadata to multiple DataHub instances.

:param datahub_conn_ids: References to the DataHub connections.
:type datahub_conn_ids: List[str]
"""

hooks: List[DatahubGenericHook] = []

def __init__(self, datahub_conn_ids: List[str]) -> None:
self.datahub_conn_ids = datahub_conn_ids

def make_emitter(self) -> CompositeEmitter:
print(f"Create emitters for {self.datahub_conn_ids}")
return CompositeEmitter(
[
self._get_underlying_hook(conn_id).make_emitter()
for conn_id in self.datahub_conn_ids
]
)

def emit(
self,
items: Sequence[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
],
) -> None:
emitter = self.make_emitter()

Check warning on line 341 in metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py#L341

Added line #L341 was not covered by tests

for item in items:
print(f"emitting item {item}")
emitter.emit(item)

Check warning on line 345 in metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py#L343-L345

Added lines #L343 - L345 were not covered by tests

def _get_underlying_hook(self, conn_id: str) -> DatahubGenericHook:
return DatahubGenericHook(conn_id)
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class AirflowInstance:
password: str

metadata_file: pathlib.Path
metadata_file2: pathlib.Path
treff7es marked this conversation as resolved.
Show resolved Hide resolved

@property
def airflow_url(self) -> str:
Expand Down Expand Up @@ -178,6 +179,7 @@ def _run_airflow(
tmp_path: pathlib.Path,
dags_folder: pathlib.Path,
is_v1: bool,
multiple_connections: bool,
) -> Iterator[AirflowInstance]:
airflow_home = tmp_path / "airflow_home"
print(f"Using airflow home: {airflow_home}")
Expand All @@ -189,7 +191,9 @@ def _run_airflow(
print(f"Using airflow port: {airflow_port}")

datahub_connection_name = "datahub_file_default"
datahub_connection_name_2 = "datahub_file_default_2"
meta_file = tmp_path / "datahub_metadata.json"
meta_file2 = tmp_path / "datahub_metadata_2.json"

environment = {
**os.environ,
Expand All @@ -204,7 +208,9 @@ def _run_airflow(
"AIRFLOW__API__AUTH_BACKEND": "airflow.api.auth.backend.basic_auth",
# Configure the datahub plugin and have it write the MCPs to a file.
"AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True",
"AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name,
"AIRFLOW__DATAHUB__CONN_ID": f"{datahub_connection_name}, {datahub_connection_name_2}"
if multiple_connections
else datahub_connection_name,
"AIRFLOW__DATAHUB__DAG_FILTER_STR": f'{{ "deny": ["{DAG_TO_SKIP_INGESTION}"] }}',
f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection(
conn_id="datahub_file_default",
Expand Down Expand Up @@ -251,6 +257,13 @@ def _run_airflow(
"SQLALCHEMY_SILENCE_UBER_WARNING": "1",
}

if multiple_connections:
environment[f"AIRFLOW_CONN_{datahub_connection_name_2.upper()}"] = Connection(
conn_id="datahub_file_default2",
conn_type="datahub-file",
host=str(meta_file2),
).get_uri()

if not HAS_AIRFLOW_STANDALONE_CMD:
raise pytest.skip("Airflow standalone command is not available")

Expand Down Expand Up @@ -315,6 +328,7 @@ def _run_airflow(
username=airflow_username,
password=airflow_password,
metadata_file=meta_file,
metadata_file2=meta_file2,
)

yield airflow_instance
Expand Down Expand Up @@ -355,10 +369,11 @@ class DagTestCase:
success: bool = True

v2_only: bool = False
multiple_connections: bool = False


test_cases = [
DagTestCase("simple_dag"),
DagTestCase("simple_dag", multiple_connections=True),
DagTestCase("basic_iolets"),
DagTestCase("dag_to_skip", v2_only=True),
DagTestCase("snowflake_operator", success=False, v2_only=True),
Expand Down Expand Up @@ -441,7 +456,10 @@ def test_airflow_plugin(
dag_id = test_case.dag_id

with _run_airflow(
tmp_path, dags_folder=DAGS_FOLDER, is_v1=is_v1
tmp_path,
dags_folder=DAGS_FOLDER,
is_v1=is_v1,
multiple_connections=test_case.multiple_connections,
) as airflow_instance:
print(f"Running DAG {dag_id}...")
_wait_for_dag_to_load(airflow_instance, dag_id)
Expand Down Expand Up @@ -491,6 +509,21 @@ def test_airflow_plugin(
],
)

if test_case.multiple_connections:
_sanitize_output_file(airflow_instance.metadata_file2)
check_golden_file(
pytestconfig=pytestconfig,
output_path=airflow_instance.metadata_file2,
golden_path=golden_path,
ignore_paths=[
# TODO: If we switched to Git urls, maybe we could get this to work consistently.
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]",
],
)


def _sanitize_output_file(output_path: pathlib.Path) -> None:
# Overwrite some custom properties in the output file to make it easier to compare.
Expand Down Expand Up @@ -534,6 +567,7 @@ def _sanitize(obj: Any) -> None:
tmp_path=pathlib.Path(tempfile.mkdtemp("airflow-plugin-test")),
dags_folder=DAGS_FOLDER,
is_v1=not HAS_AIRFLOW_LISTENER_API,
multiple_connections=False,
) as airflow_instance:
# input("Press enter to exit...")
breakpoint()
Expand Down
36 changes: 36 additions & 0 deletions metadata-ingestion/src/datahub/emitter/composite_emitter.py
treff7es marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Callable, List, Optional, Union

Check warning on line 1 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L1

Added line #L1 was not covered by tests

from datahub.emitter.generic_emitter import Emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (

Check warning on line 5 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L3-L5

Added lines #L3 - L5 were not covered by tests
MetadataChangeEvent,
MetadataChangeProposal,
)


# Experimental composite emitter that allows multiple emitters to be used in a single ingestion job
class CompositeEmitter(Emitter):
treff7es marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, emitters: List[Emitter]) -> None:
self.emitters = emitters

Check warning on line 14 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L12-L14

Added lines #L12 - L14 were not covered by tests

def emit(

Check warning on line 16 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L16

Added line #L16 was not covered by tests
self,
item: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
callback_called = False
for emitter in self.emitters:
if not callback_called:

Check warning on line 27 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L25-L27

Added lines #L25 - L27 were not covered by tests
# We want to ensure that the callback is only called once and we tie it to the first emitter that
treff7es marked this conversation as resolved.
Show resolved Hide resolved
emitter.emit(item, callback)
callback_called = True

Check warning on line 30 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L29-L30

Added lines #L29 - L30 were not covered by tests
else:
emitter.emit(item)

Check warning on line 32 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L32

Added line #L32 was not covered by tests

def flush(self) -> None:
for emitter in self.emitters:
emitter.flush()

Check warning on line 36 in metadata-ingestion/src/datahub/emitter/composite_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/composite_emitter.py#L34-L36

Added lines #L34 - L36 were not covered by tests
Loading