Skip to content

Commit

Permalink
feat: Preset.io source (Superset SaaS)
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida committed Jun 7, 2023
1 parent 57c5d2f commit 15fe267
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 2 deletions.
13 changes: 13 additions & 0 deletions metadata-ingestion/docs/sources/superset/preset_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
source:
type: preset
config:
# Coordinates
connect_uri: https://abcdef01.us1a.app.preset.io
manager_uri: https://api.app.preset.io

# Credentials
api_key: key
api_secret: secret

sink:
# sink configs
82 changes: 82 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,3 +412,85 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:

def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report


class PresetConfig(StatefulIngestionConfigBase, ConfigModel):
# See the Preset.io API docs: https://api-docs.preset.io/
manager_uri: str = Field(
default="https://api.app.preset.io/", description="Preset.io API URL"
)
connect_uri: str = Field(default=None, description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Superset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
platform = "preset"

def __init__(self, ctx: PipelineContext, config: SupersetConfig):
StatefulIngestionSourceBase.__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()

login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_token, "secret": self.config.api_secret},
)

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
49 changes: 47 additions & 2 deletions metadata-ingestion/tests/integration/superset/test_superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None:
"access_token": "test_token",
},
},
"mock://api.app.preset.io/v1/auth/": {
"method": "POST",
"status_code": 200,
"json": {
"payload": {
"access_token": "test_token",
},
},
},
"mock://mock-domain.superset.com/api/v1/dashboard/": {
"method": "GET",
"status_code": 200,
Expand Down Expand Up @@ -151,7 +160,6 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None:
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_superset_ingest(pytestconfig, tmp_path, mock_time, requests_mock):

test_resources_dir = pytestconfig.rootpath / "tests/integration/superset"

register_mock_api(request_mock=requests_mock)
Expand Down Expand Up @@ -193,7 +201,6 @@ def test_superset_ingest(pytestconfig, tmp_path, mock_time, requests_mock):
def test_superset_stateful_ingest(
pytestconfig, tmp_path, mock_time, requests_mock, mock_datahub_graph
):

test_resources_dir = pytestconfig.rootpath / "tests/integration/superset"

register_mock_api(request_mock=requests_mock)
Expand Down Expand Up @@ -306,3 +313,41 @@ def test_superset_stateful_ingest(
output_path=deleted_mces_path,
golden_path=test_resources_dir / "golden_test_stateful_ingest.json",
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_preset_ingest(pytestconfig, tmp_path, mock_time, requests_mock):
test_resources_dir = pytestconfig.rootpath / "tests/integration/superset"

register_mock_api(request_mock=requests_mock)

pipeline = Pipeline.create(
{
"run_id": "preset-test",
"source": {
"type": "preset",
"config": {
"connect_uri": "mock://mock-domain.superset.com/",
"api_key": "KEY",
"api_secret": "SECRET",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/preset_mces.json",
},
},
}
)

pipeline.run()
pipeline.raise_from_status()
golden_file = "golden_test_ingest.json"

mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "preset_mces.json",
golden_path=f"{test_resources_dir}/{golden_file}",
)

0 comments on commit 15fe267

Please sign in to comment.