Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuixi/cleanup superset #11036

Closed
wants to merge 10 commits into from
Closed
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
119 changes: 119 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.configuration import ConfigModel

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)

from datahub.utilities import config_clean\

from datahub.ingestion.source.superset import SupersetSource

logger = logging.getLogger(__name__)
class PresetConfig(StatefulIngestionConfigBase, ConfigModel):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default=None, description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.login()

def login(self):
logger.info(f"self.config is {self.config}")

login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
8 changes: 7 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import (
EnvConfigMixin,
Expand All @@ -19,7 +20,10 @@
make_dataset_urn,
make_domain_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.emitter.mcp_builder import (
add_domain_to_entity_wu
)
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -178,7 +182,9 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.login()

def login(self):
login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
json={
Expand Down
Loading
Loading