-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/airflow): Add support for mutiple datahub emitter #12398
feat(ingestion/airflow): Add support for mutiple datahub emitter #12398
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should have multiple listeners - that would mean that we redo the SQL/lineage extraction work multiple times. imo the listener should remain a singleton.
We discussed the idea of a composite emitter - was there a reason we didn't go that route?
I'm sorry, I somehow thought this was a plan you wanted to implement in the future. |
@treff7es What I was thinking was
|
callback_called = False | ||
for emitter in self.emitters: | ||
if not callback_called: | ||
emitter.emit(item, callback) | ||
callback_called = True | ||
else: | ||
emitter.emit(item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might also make sense to add a comment explaining that we want the callback to only be called once, and we tie that call to the first emitter. so we should explicitly clarify that errors from other emitters will be suppressed
callback_called = False | |
for emitter in self.emitters: | |
if not callback_called: | |
emitter.emit(item, callback) | |
callback_called = True | |
else: | |
emitter.emit(item) | |
for i, emitter in enumerate(self.emitters): | |
emitter.emit(item, callback if i == 0 else None) |
@@ -491,6 +500,21 @@ def test_airflow_plugin( | |||
], | |||
) | |||
|
|||
if test_case.multiple_connections and not is_v1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment still applies
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py
Outdated
Show resolved
Hide resolved
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py
Outdated
Show resolved
Hide resolved
metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py
Outdated
Show resolved
Hide resolved
@@ -68,11 +71,19 @@ class DatahubLineageConfig(ConfigModel): | |||
|
|||
disable_openlineage_plugin: bool = True | |||
|
|||
def make_emitter_hook(self) -> "DatahubGenericHook": | |||
def make_emitter_hook(self) -> Union["DatahubGenericHook", "DatahubCompositeHook"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a thought, not necessary to do: should the DatahubGenericHook
be the thing responsible for constructing the composite emitter?
e.g. you can pass it either a string or a list of strings, and it does the right thing
Checklist