Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add preset source #10954

Merged
merged 23 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9bc43b1
datahub metadata-ingestion support for preset
llance Jul 19, 2024
d677b06
cleanup unused import
llance Jul 22, 2024
29da83b
Add preset tests
hwmarkcheng Jul 24, 2024
13c8c29
Add back test respone
hwmarkcheng Jul 24, 2024
ab34cc7
fix unit tests
hwmarkcheng Jul 25, 2024
8a23dd5
Add versioning to integration test
hwmarkcheng Jul 25, 2024
06b045d
Merge branch 'datahub-project:master' into master
llance Jul 25, 2024
4e03a23
Merge pull request #1 from llance/add-preset-tests
hwmarkcheng Jul 25, 2024
8f7bfed
Merge branch 'datahub-project:master' into master
llance Jul 30, 2024
23d88e7
remove preset specific config keys
llance Jul 30, 2024
5c4211c
Merge pull request #2 from llance/shuixi/cleanup-superset
llance Jul 31, 2024
9e9b79a
Merge branch 'datahub-project:master' into master
llance Jul 31, 2024
8708be6
Merge branch 'datahub-project:master' into master
llance Aug 7, 2024
a9a9ecc
Merge branch 'datahub-project:master' into master
llance Aug 20, 2024
6741c51
working state
llance Aug 27, 2024
36cffd9
Address PR comments
hwmarkcheng Sep 30, 2024
1e9c3c9
Fix session
hwmarkcheng Sep 30, 2024
a566d88
Merge pull request #5 from llance/fix-preset-ingestion
hwmarkcheng Sep 30, 2024
a823082
Merge branch 'master' into master
hwmarkcheng Sep 30, 2024
dd10b4b
Fix Linting Issues
hwmarkcheng Oct 7, 2024
a3ab921
Merge pull request #6 from llance/fix-lint-issues
hwmarkcheng Oct 7, 2024
6258bd9
Update Preset Tests
hwmarkcheng Oct 9, 2024
088af77
Merge pull request #7 from llance/fix-preset-tests
hwmarkcheng Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
119 changes: 119 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.configuration import ConfigModel

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)

from datahub.utilities import config_clean\

from datahub.ingestion.source.superset import SupersetSource

logger = logging.getLogger(__name__)
class PresetConfig(StatefulIngestionConfigBase, ConfigModel):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default=None, description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.login()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if super() already calls login, we don't need to explicitly call it again, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're actually looking to overwrite the login in superset since the authentication method is different (Superset is username/password, Preset is API token based name/secret and also has a different URL)


def login(self):
logger.info(f"self.config is {self.config}")

login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling for the login process.

The login method should handle potential errors during the authentication process more robustly.

-        login_response = requests.post(
+        try:
+            login_response = requests.post(
-        self.access_token = login_response.json()["payload"]["access_token"]
+            login_response.raise_for_status()
+            self.access_token = login_response.json()["payload"]["access_token"]
+        except requests.exceptions.RequestException as e:
+            logger.error(f"Failed to authenticate with Preset: {e}")
+            raise

Committable suggestion was skipped due to low confidence.

8 changes: 7 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import (
EnvConfigMixin,
Expand All @@ -19,7 +20,10 @@
make_dataset_urn,
make_domain_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.emitter.mcp_builder import (
add_domain_to_entity_wu
)
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -179,7 +183,9 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.login()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be cleaner to make this self.session = self.login()

also the domain_registry logic should not be in the login method


def login(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Address the TODO comment.

There is a TODO comment regarding how to message about connection errors. Ensure that appropriate error messaging is implemented.

Do you want me to help implement the error messaging or open a GitHub issue to track this task?

login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
json={
Expand Down
Loading