Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/airflow): Add support for mutiple datahub emitter #12398

Conversation

treff7es
Copy link
Contributor

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Jan 20, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Jan 20, 2025
Copy link

codecov bot commented Jan 20, 2025

Codecov Report

Attention: Patch coverage is 85.96491% with 8 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...plugin/src/datahub_airflow_plugin/hooks/datahub.py 75.00% 4 Missing ⚠️
...n/src/datahub_airflow_plugin/datahub_plugin_v22.py 66.66% 3 Missing ⚠️
...rflow-plugin/src/datahub_airflow_plugin/_config.py 93.33% 1 Missing ⚠️

❌ Your patch status has failed because the patch coverage (80.00%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage.

Files with missing lines Coverage Δ
...ingestion/src/datahub/emitter/composite_emitter.py 100.00% <100.00%> (ø)
...rflow-plugin/src/datahub_airflow_plugin/_config.py 98.24% <93.33%> (ø)
...n/src/datahub_airflow_plugin/datahub_plugin_v22.py 60.45% <66.66%> (ø)
...plugin/src/datahub_airflow_plugin/hooks/datahub.py 78.52% <75.00%> (ø)

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update dd01c82...64f1d2d. Read the comment docs.

Copy link
Collaborator

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have multiple listeners - that would mean that we redo the SQL/lineage extraction work multiple times. imo the listener should remain a singleton.

We discussed the idea of a composite emitter - was there a reason we didn't go that route?

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 21, 2025
@treff7es
Copy link
Contributor Author

I don't think we should have multiple listeners - that would mean that we redo the SQL/lineage extraction work multiple times. imo the listener should remain a singleton.

We discussed the idea of a composite emitter - was there a reason we didn't go that route?

I'm sorry, I somehow thought this was a plan you wanted to implement in the future.

@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 21, 2025
@hsheth2
Copy link
Collaborator

hsheth2 commented Jan 21, 2025

@treff7es What I was thinking was

  • there would continue to be a single Listener class
  • listener.emitter -> would return a RestEmitter or KafkaEmitter if there's a single conn id, or a CompositeEmitter / MultiEmitter if there's multiple
  • the CompositeEmitter is basically a wrapper class e.g. CompositeEmitter([emitter1, emitter2, ...]). It would only have a couple methods e.g. emit/close which would be pretty simple e.g. for emitter in self._inner_emitters: emitter.emit(...). It would live in metadata-ingestion, not in the airflow plugin.
  • The config type / listener init method would be responsible for creating the CompositeEmitter instance
  • The rest of the listener's implementation can continue to call listener.emitter.emit(...) and hence would be largely unchanged

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 21, 2025
@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 22, 2025
Comment on lines 24 to 30
callback_called = False
for emitter in self.emitters:
if not callback_called:
emitter.emit(item, callback)
callback_called = True
else:
emitter.emit(item)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might also make sense to add a comment explaining that we want the callback to only be called once, and we tie that call to the first emitter. so we should explicitly clarify that errors from other emitters will be suppressed

Suggested change
callback_called = False
for emitter in self.emitters:
if not callback_called:
emitter.emit(item, callback)
callback_called = True
else:
emitter.emit(item)
for i, emitter in enumerate(self.emitters):
emitter.emit(item, callback if i == 0 else None)

@@ -491,6 +500,21 @@ def test_airflow_plugin(
],
)

if test_case.multiple_connections and not is_v1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment still applies

@@ -68,11 +71,19 @@ class DatahubLineageConfig(ConfigModel):

disable_openlineage_plugin: bool = True

def make_emitter_hook(self) -> "DatahubGenericHook":
def make_emitter_hook(self) -> Union["DatahubGenericHook", "DatahubCompositeHook"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought, not necessary to do: should the DatahubGenericHook be the thing responsible for constructing the composite emitter?

e.g. you can pass it either a string or a list of strings, and it does the right thing

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Jan 24, 2025
@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Jan 27, 2025
@datahub-cyborg datahub-cyborg bot added pending-submitter-merge and removed needs-review Label for PRs that need review from a maintainer. labels Jan 29, 2025
@treff7es treff7es merged commit 22e012a into datahub-project:master Jan 29, 2025
187 of 190 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants