Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add preset source #10954

Merged
merged 23 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9bc43b1
datahub metadata-ingestion support for preset
llance Jul 19, 2024
d677b06
cleanup unused import
llance Jul 22, 2024
29da83b
Add preset tests
hwmarkcheng Jul 24, 2024
13c8c29
Add back test respone
hwmarkcheng Jul 24, 2024
ab34cc7
fix unit tests
hwmarkcheng Jul 25, 2024
8a23dd5
Add versioning to integration test
hwmarkcheng Jul 25, 2024
06b045d
Merge branch 'datahub-project:master' into master
llance Jul 25, 2024
4e03a23
Merge pull request #1 from llance/add-preset-tests
hwmarkcheng Jul 25, 2024
8f7bfed
Merge branch 'datahub-project:master' into master
llance Jul 30, 2024
23d88e7
remove preset specific config keys
llance Jul 30, 2024
5c4211c
Merge pull request #2 from llance/shuixi/cleanup-superset
llance Jul 31, 2024
9e9b79a
Merge branch 'datahub-project:master' into master
llance Jul 31, 2024
8708be6
Merge branch 'datahub-project:master' into master
llance Aug 7, 2024
a9a9ecc
Merge branch 'datahub-project:master' into master
llance Aug 20, 2024
6741c51
working state
llance Aug 27, 2024
36cffd9
Address PR comments
hwmarkcheng Sep 30, 2024
1e9c3c9
Fix session
hwmarkcheng Sep 30, 2024
a566d88
Merge pull request #5 from llance/fix-preset-ingestion
hwmarkcheng Sep 30, 2024
a823082
Merge branch 'master' into master
hwmarkcheng Sep 30, 2024
dd10b4b
Fix Linting Issues
hwmarkcheng Oct 7, 2024
a3ab921
Merge pull request #6 from llance/fix-lint-issues
hwmarkcheng Oct 7, 2024
6258bd9
Update Preset Tests
hwmarkcheng Oct 9, 2024
088af77
Merge pull request #7 from llance/fix-preset-tests
hwmarkcheng Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
114 changes: 114 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource
from datahub.utilities import config_clean

logger = logging.getLogger(__name__)


class PresetConfig(SupersetConfig):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default="", description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()

def login(self):
try:
login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to authenticate with Preset: {e}")
raise e

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)
# Test the connection
test_response = requests_session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
return requests_session
28 changes: 18 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class SupersetConfig(
)
username: Optional[str] = Field(default=None, description="Superset username.")
password: Optional[str] = Field(default=None, description="Superset password.")

api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Superset Stateful Ingestion Config."
Expand Down Expand Up @@ -179,7 +183,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)
self.session = self.login()

def login(self) -> requests.Session:
login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
json={
Expand All @@ -193,26 +204,23 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
self.access_token = login_response.json()["access_token"]
logger.debug("Got access token from superset")

self.session = requests.Session()
self.session.headers.update(
requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/api/v1/dashboard/")
test_response = requests_session.get(
f"{self.config.connect_uri}/api/v1/dashboard/"
)
if test_response.status_code == 200:
pass
# TODO(Gabe): how should we message about this error?
return requests_session

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
Expand Down
Loading
Loading