From 15fe2679c9baf710c8a8db2f925466d20bcdf93f Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Wed, 7 Jun 2023 09:16:13 -0700 Subject: [PATCH] feat: Preset.io source (Superset SaaS) --- .../docs/sources/superset/preset_recipe.yml | 13 +++ .../src/datahub/ingestion/source/superset.py | 82 +++++++++++++++++++ .../integration/superset/test_superset.py | 49 ++++++++++- 3 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/docs/sources/superset/preset_recipe.yml diff --git a/metadata-ingestion/docs/sources/superset/preset_recipe.yml b/metadata-ingestion/docs/sources/superset/preset_recipe.yml new file mode 100644 index 00000000000000..99978a154acc49 --- /dev/null +++ b/metadata-ingestion/docs/sources/superset/preset_recipe.yml @@ -0,0 +1,13 @@ +source: + type: preset + config: + # Coordinates + connect_uri: https://abcdef01.us1a.app.preset.io + manager_uri: https://api.app.preset.io + + # Credentials + api_key: key + api_secret: secret + +sink: + # sink configs diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 7bf19db25e3bb1..6fe71145ed08dc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -412,3 +412,85 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_report(self) -> StaleEntityRemovalSourceReport: return self.report + + +class PresetConfig(StatefulIngestionConfigBase, ConfigModel): + # See the Preset.io API docs: https://api-docs.preset.io/ + manager_uri: str = Field( + default="https://api.app.preset.io/", description="Preset.io API URL" + ) + connect_uri: str = Field(default=None, description="Preset workspace URL.") + display_uri: Optional[str] = Field( + default=None, + description="optional URL to use in links (if `connect_uri` is only for ingestion)", + ) + api_key: Optional[str] = Field(default=None, description="Preset.io API key.") + api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") + + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( + default=None, description="Superset Stateful Ingestion Config." + ) + + options: Dict = Field(default={}, description="") + env: str = Field( + default=DEFAULT_ENV, + description="Environment to use in namespace when constructing URNs", + ) + database_alias: Dict[str, str] = Field( + default={}, + description="Can be used to change mapping for database names in superset to what you have in datahub", + ) + + @validator("connect_uri", "display_uri") + def remove_trailing_slash(cls, v): + return config_clean.remove_trailing_slashes(v) + + @root_validator + def default_display_uri_to_connect_uri(cls, values): + base = values.get("display_uri") + if base is None: + values["display_uri"] = values.get("connect_uri") + return values + + +@platform_name("Preset") +@config_class(PresetConfig) +@support_status(SupportStatus.TESTING) +@capability( + SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion" +) +class PresetSource(SupersetSource): + """ + Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS). + """ + + config: PresetConfig + platform = "preset" + + def __init__(self, ctx: PipelineContext, config: SupersetConfig): + StatefulIngestionSourceBase.__init__(config, ctx) + self.config = config + self.report = StaleEntityRemovalSourceReport() + + login_response = requests.post( + f"{self.config.manager_uri}/v1/auth/", + json={"name": self.config.api_token, "secret": self.config.api_secret}, + ) + + self.access_token = login_response.json()["payload"]["access_token"] + logger.debug("Got access token from Preset") + + self.session = requests.Session() + self.session.headers.update( + { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + "Accept": "*/*", + } + ) + + # Test the connection + test_response = self.session.get(f"{self.config.connect_uri}/version") + if not test_response.ok: + logger.error("Unable to connect to workspace") diff --git a/metadata-ingestion/tests/integration/superset/test_superset.py b/metadata-ingestion/tests/integration/superset/test_superset.py index bc299e36515e18..dc4c493965ddde 100644 --- a/metadata-ingestion/tests/integration/superset/test_superset.py +++ b/metadata-ingestion/tests/integration/superset/test_superset.py @@ -26,6 +26,15 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: "access_token": "test_token", }, }, + "mock://api.app.preset.io/v1/auth/": { + "method": "POST", + "status_code": 200, + "json": { + "payload": { + "access_token": "test_token", + }, + }, + }, "mock://mock-domain.superset.com/api/v1/dashboard/": { "method": "GET", "status_code": 200, @@ -151,7 +160,6 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_superset_ingest(pytestconfig, tmp_path, mock_time, requests_mock): - test_resources_dir = pytestconfig.rootpath / "tests/integration/superset" register_mock_api(request_mock=requests_mock) @@ -193,7 +201,6 @@ def test_superset_ingest(pytestconfig, tmp_path, mock_time, requests_mock): def test_superset_stateful_ingest( pytestconfig, tmp_path, mock_time, requests_mock, mock_datahub_graph ): - test_resources_dir = pytestconfig.rootpath / "tests/integration/superset" register_mock_api(request_mock=requests_mock) @@ -306,3 +313,41 @@ def test_superset_stateful_ingest( output_path=deleted_mces_path, golden_path=test_resources_dir / "golden_test_stateful_ingest.json", ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_preset_ingest(pytestconfig, tmp_path, mock_time, requests_mock): + test_resources_dir = pytestconfig.rootpath / "tests/integration/superset" + + register_mock_api(request_mock=requests_mock) + + pipeline = Pipeline.create( + { + "run_id": "preset-test", + "source": { + "type": "preset", + "config": { + "connect_uri": "mock://mock-domain.superset.com/", + "api_key": "KEY", + "api_secret": "SECRET", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/preset_mces.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "golden_test_ingest.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "preset_mces.json", + golden_path=f"{test_resources_dir}/{golden_file}", + )