Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Dagster Openmetadata Ingestion Pipeline #1484

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ dependencies = [
"dagster-slack ~=0.26.1",
"dagster-webserver ~= 1.10",
"dbt-duckdb ~= 1.9.0",
"dbt-trino ~= 1.9.0",
"flatten-dict ~= 0.4.2",
"fsspec ~=2025.2.0",
"gcsfs (>=2025.2.0,<2025.3.0)",
"httpx ~= 0.28.0",
"hvac ~= 2.3.0",
"jsonlines ~= 4.0.0",
"openmetadata-ingestion[trino]>=1.6.4.0",
"polars ~= 1.19",
"pyarrow ~=19.0.0",
"pydantic ~= 2.10.0",
Expand Down
Empty file added src/__init__.py
Empty file.
Empty file.
31 changes: 31 additions & 0 deletions src/ol_orchestrate/assets/platform/openmetadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dagster import (
AssetExecutionContext,
AssetKey,
DataVersion,
Output,
asset,
)
from metadata.workflow.metadata import MetadataWorkflow

from ol_orchestrate.partitions.openmetadata import TRINO_DAILY_PARTITIONS
from ol_orchestrate.resources.openmetadata.trino import trino_config


@asset(
description=("An instance of metadata from our Trino database."),
group_name="platform",
key=AssetKey(["platform", "database", "trino", "metadata"]),
partitions_def=TRINO_DAILY_PARTITIONS,
)
def trino_metadata(context: AssetExecutionContext):
date = context.partition_key
workflow = MetadataWorkflow.create(trino_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
return Output(
workflow,
data_version=DataVersion(workflow["published_version"]),
metadata={"source": "Trino", "date": date},
)
30 changes: 30 additions & 0 deletions src/ol_orchestrate/definitions/platform/openmetadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dagster import create_repository_using_definitions_args
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.platform.openmetadata import trino_metadata
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.resources.secrets.vault import Vault

if DAGSTER_ENV == "dev":
vault = Vault(vault_addr=VAULT_ADDRESS, vault_auth_type="github")
vault._auth_github() # noqa: SLF001
else:
vault = Vault(
vault_addr=VAULT_ADDRESS, vault_role="dagster-server", aws_auth_mount="aws"
)
vault._auth_aws_iam() # noqa: SLF001


deployment_name = "openmetadata"
deployment_assets = [
trino_metadata,
]

create_repository_using_definitions_args(
name=f"{deployment_name}_assets",
resources={
"vault": vault,
"s3": S3Resource(),
},
assets=deployment_assets,
)
4 changes: 4 additions & 0 deletions src/ol_orchestrate/partitions/openmetadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dagster import DailyPartitionsDefinition

# Define daily partitions
TRINO_DAILY_PARTITIONS = DailyPartitionsDefinition(start_date="2023-01-01")
Empty file.
44 changes: 44 additions & 0 deletions src/ol_orchestrate/resources/openmetadata/trino.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# treat this as a dictionary and reference variables in the appropriate locations
trino_config = {
"source": {
"type": "trino",
"serviceName": "Starburst Galaxy",
"serviceConnection": {
"config": {
"type": "Trino",
"hostPort": "mitol-ol-data-lake-production.trino.galaxy.starburst.io:443", # noqa: E501
"username": "[email protected]",
"authType": {
"password": "<retrieve this from https://vault-production.odl.mit.edu/ui/vault/secrets/platform-secrets/kv/starburst-galaxy/details?version=4>"
},
Comment on lines +12 to +13

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Storing credentials directly in the code is a security risk. Consider using a more secure method, such as retrieving the password from Vault or another secrets management system. This is a critical security concern that needs to be addressed before merging.

                    "password": vault.read_secret("path/to/secret")["password"] # Retrieve from Vault

"catalog": "ol_data_lake_production",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
"markDeletedTables": True,
"markDeletedStoredProcedures": True,
"includeTables": True,
"includeViews": True,
"includeTags": True,
"includeOwners": True,
"includeStoredProcedures": True,
"includeDDL": True,
"threads": 10,
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "https://open-metadata-qa.ol.mit.edu/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "<retrieve this from https://open-metadata-ci.ol.mit.edu/bots/ingestion-bot>"
},
Comment on lines +39 to +40

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Similar to the password, the JWT token should be retrieved from a secure source like Vault, rather than being hardcoded in the configuration. This is a critical security concern that needs to be addressed before merging.

                "jwtToken": vault.read_secret("path/to/jwt")["jwtToken"] # Retrieve from Vault

"storeServiceConnection": True,
},
},
}
1 change: 1 addition & 0 deletions src/ol_orchestrate/workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ load_from:
- python_module: ol_orchestrate.definitions.edx.sync_program_credential_reports
- python_module: ol_orchestrate.definitions.lakehouse.elt
- python_module: ol_orchestrate.definitions.platform.notification
- python_module: ol_orchestrate.definitions.platform.openmetadata
- python_module: ol_orchestrate.repositories.edx_gcs_courses
- python_module: ol_orchestrate.repositories.open_edx
721 changes: 625 additions & 96 deletions uv.lock

Large diffs are not rendered by default.