Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): remove default value from DatahubClientConfig.server #11570

Merged
merged 5 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11484 - Rest API authorization enabled by default
- #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.
- #11619 - schema field/column paths can no longer be empty strings
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ class Constant:
# Default config constants
DEFAULT_DATAHUB_REST_URL = "http://localhost:8080"

# Environment variable contants
DATAHUB_REST_URL = "DATAHUB_REST_URL"
DATAHUB_ENV = "DATAHUB_ENV"
DATAHUB_PLATFORM_INSTANCE = "DATAHUB_PLATFORM_INSTANCE"
DAGSTER_UI_URL = "DAGSTER_UI_URL"

# Datahub inputs/outputs constant
DATAHUB_INPUTS = "datahub.inputs"
DATAHUB_OUTPUTS = "datahub.outputs"
Expand Down Expand Up @@ -154,7 +148,6 @@ class DatasetLineage(NamedTuple):

class DatahubDagsterSourceConfig(DatasetSourceConfigMixin):
datahub_client_config: DatahubClientConfig = pydantic.Field(
default=DatahubClientConfig(),
description="Datahub client config",
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import traceback
import warnings
from collections import defaultdict
from types import ModuleType
from typing import Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
Expand Down Expand Up @@ -38,7 +39,7 @@
from dagster._core.events import DagsterEventType, HandledOutputData, LoadedInputData
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import SubTypesClass
from datahub.sql_parsing.sqlglot_lineage import (
SqlParsingResult,
Expand All @@ -47,6 +48,7 @@
from datahub.utilities.urns.dataset_urn import DatasetUrn

from datahub_dagster_plugin.client.dagster_generator import (
Constant,
DagsterEnvironment,
DagsterGenerator,
DatahubDagsterSourceConfig,
Expand Down Expand Up @@ -182,7 +184,17 @@ def __init__(
if config:
self.config = config
else:
self.config = DatahubDagsterSourceConfig()
# This is a temporary warning for backwards compatibility. Eventually, we'll remove this
# branch and make the config required.
warnings.warn(
"Using the default DataHub client config is deprecated. Pass in a config object explicitly.",
stacklevel=2,
)
self.config = DatahubDagsterSourceConfig(
datahub_client_config=DatahubClientConfig(
server=Constant.DEFAULT_DATAHUB_REST_URL
)
)
self.graph = DataHubGraph(
self.config.datahub_client_config,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class DataProduct(ConfigModel):
@pydantic.validator("assets", each_item=True)
def assets_must_be_urns(cls, v: str) -> str:
try:
Urn.create_from_string(v)
Urn.from_string(v)
except Exception as e:
raise ValueError(f"asset {v} is not an urn: {e}") from e

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class DatahubClientConfig(ConfigModel):

# TODO: Having a default for the server doesn't make a ton of sense. This should be handled
# by callers / the CLI, but the actual client should not have any magic.
server: str = "http://localhost:8080"
server: str
token: Optional[str] = None
timeout_sec: Optional[int] = None
retry_status_codes: Optional[List[int]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


class DatahubIngestionStateProviderConfig(IngestionCheckpointingProviderConfig):
datahub_api: DatahubClientConfig = DatahubClientConfig()
datahub_api: Optional[DatahubClientConfig] = None


class DatahubIngestionCheckpointingProvider(IngestionCheckpointingProviderBase):
Expand All @@ -31,20 +31,24 @@ def __init__(
self.graph = graph
if not self._is_server_stateful_ingestion_capable():
raise ConfigurationError(
"Datahub server is not capable of supporting stateful ingestion."
" Please consider upgrading to the latest server version to use this feature."
"Datahub server is not capable of supporting stateful ingestion. "
"Please consider upgrading to the latest server version to use this feature."
)

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext
) -> "DatahubIngestionCheckpointingProvider":
config = DatahubIngestionStateProviderConfig.parse_obj(config_dict)
if ctx.graph:
# Use the pipeline-level graph if set
if config.datahub_api is not None:
return cls(DataHubGraph(config.datahub_api))
elif ctx.graph:
# Use the pipeline-level graph if set.
return cls(ctx.graph)
else:
return cls(DataHubGraph(config.datahub_api))
raise ValueError(
"A graph instance is required. Either pass one in the pipeline context, or set it explicitly in the stateful ingestion provider config."
)

def _is_server_stateful_ingestion_capable(self) -> bool:
server_config = self.graph.get_config() if self.graph else None
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
docker_compose_command,
docker_compose_runner,
)
from tests.test_helpers.state_helpers import mock_datahub_graph # noqa: F401,E402
from tests.test_helpers.state_helpers import ( # noqa: F401,E402
mock_datahub_graph,
mock_datahub_graph_instance,
)

try:
# See https://github.com/spulec/freezegun/issues/98#issuecomment-590553475.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest
from freezegun import freeze_time

from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.metadata import business_glossary
from tests.test_helpers import mce_helpers
Expand Down Expand Up @@ -41,7 +40,12 @@ def get_default_recipe(
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_glossary_ingest(
mock_datahub_graph, pytestconfig, tmp_path, mock_time, enable_auto_id, golden_file
mock_datahub_graph_instance,
pytestconfig,
tmp_path,
mock_time,
enable_auto_id,
golden_file,
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/business-glossary"

Expand All @@ -55,9 +59,7 @@ def test_glossary_ingest(
enable_auto_id=enable_auto_id,
)
)
pipeline.ctx.graph = mock_datahub_graph(
DatahubClientConfig()
) # Mock to resolve domain
pipeline.ctx.graph = mock_datahub_graph_instance
pipeline.run()
pipeline.raise_from_status()

Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/tests/test_helpers/state_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import types
from typing import Any, Dict, Optional, Type, cast
from typing import Any, Callable, Dict, Optional, Type, cast
from unittest.mock import MagicMock, create_autospec

import pytest
Expand All @@ -10,6 +10,7 @@
IngestionCheckpointingProviderBase,
)
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
Expand Down Expand Up @@ -101,6 +102,13 @@ def monkey_patch_get_latest_timeseries_value(
return mock_datahub_graph_ctx.mock_graph


@pytest.fixture
def mock_datahub_graph_instance(
mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph]
) -> DataHubGraph:
return mock_datahub_graph(DatahubClientConfig(server="http://fake.domain.local"))


def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint[GenericCheckpointState]]:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/graph/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_get_aspect(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig())
graph = DataHubGraph(DatahubClientConfig(server="http://fake-domain.local"))
user_urn = "urn:li:corpuser:foo"
with patch("requests.Session.get") as mock_get:
mock_response = Mock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,11 @@
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
DynamicTypedStateProviderConfig,
)
from datahub.ingestion.source.state.usage_common_state import (
BaseTimeWindowCheckpointState,
)
from datahub.utilities.time import datetime_to_ts_millis

GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"


@pytest.fixture
def stateful_source(mock_datahub_graph: DataHubGraph) -> Iterable[SnowflakeV2Source]:
Expand All @@ -39,9 +33,7 @@ def stateful_source(mock_datahub_graph: DataHubGraph) -> Iterable[SnowflakeV2Sou
password="TST_PWD",
stateful_ingestion=StatefulStaleMetadataRemovalConfig(
enabled=True,
state_provider=DynamicTypedStateProviderConfig(
type="datahub", config={"datahub_api": {"server": GMS_SERVER}}
),
# Uses the graph from the pipeline context.
),
)

Expand Down
39 changes: 11 additions & 28 deletions metadata-ingestion/tests/unit/stateful_ingestion/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@
)

# 0. Common client configs.
datahub_client_configs: Dict[str, Any] = {
"full": {
"server": "http://localhost:8080",
"token": "dummy_test_tok",
"timeout_sec": 10,
"extra_headers": {},
"max_threads": 10,
},
"simple": {},
"default": {},
datahub_client_full_config = {
"server": "http://localhost:8080",
"token": "dummy_test_tok",
"timeout_sec": 10,
"extra_headers": {},
"max_threads": 10,
}


Expand All @@ -41,7 +37,7 @@
"checkpointing_valid_full_config": (
DatahubIngestionStateProviderConfig,
{
"datahub_api": datahub_client_configs["full"],
"datahub_api": datahub_client_full_config,
},
DatahubIngestionStateProviderConfig(
# This test verifies that the max_threads arg is ignored.
Expand All @@ -57,27 +53,14 @@
),
False,
),
# Simple config
"checkpointing_valid_simple_config": (
DatahubIngestionStateProviderConfig,
{
"datahub_api": datahub_client_configs["simple"],
},
DatahubIngestionStateProviderConfig(
datahub_api=DatahubClientConfig(
server="http://localhost:8080",
),
),
False,
),
# Default
"checkpointing_default": (
DatahubIngestionStateProviderConfig,
{
"datahub_api": datahub_client_configs["default"],
"datahub_api": None,
},
DatahubIngestionStateProviderConfig(
datahub_api=DatahubClientConfig(),
datahub_api=None,
),
False,
),
Expand All @@ -102,7 +85,7 @@
"max_checkpoint_state_size": 1024,
"state_provider": {
"type": "datahub",
"config": datahub_client_configs["full"],
"config": datahub_client_full_config,
},
"ignore_old_state": True,
"ignore_new_state": True,
Expand All @@ -114,7 +97,7 @@
ignore_new_state=True,
state_provider=DynamicTypedStateProviderConfig(
type="datahub",
config=datahub_client_configs["full"],
config=datahub_client_full_config,
),
),
False,
Expand Down
18 changes: 9 additions & 9 deletions metadata-ingestion/tests/unit/test_glue_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple, Type, cast
from typing import Any, Dict, Optional, Tuple, Type, cast
from unittest.mock import patch

import pydantic
Expand All @@ -11,7 +11,7 @@
import datahub.metadata.schema_classes as models
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.aws.glue import (
GlueProfilingConfig,
Expand Down Expand Up @@ -74,7 +74,7 @@

def glue_source(
platform_instance: Optional[str] = None,
mock_datahub_graph: Optional[Callable[[DatahubClientConfig], DataHubGraph]] = None,
mock_datahub_graph_instance: Optional[DataHubGraph] = None,
use_s3_bucket_tags: bool = True,
use_s3_object_tags: bool = True,
extract_delta_schema_from_parameters: bool = False,
Expand All @@ -83,8 +83,8 @@ def glue_source(
extract_transforms: bool = True,
) -> GlueSource:
pipeline_context = PipelineContext(run_id="glue-source-tes")
if mock_datahub_graph:
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
if mock_datahub_graph_instance:
pipeline_context.graph = mock_datahub_graph_instance
return GlueSource(
ctx=pipeline_context,
config=GlueSourceConfig(
Expand Down Expand Up @@ -493,14 +493,14 @@ def test_glue_with_malformed_delta_schema_ingest(
def test_glue_ingest_include_table_lineage(
tmp_path: Path,
pytestconfig: PytestConfig,
mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph],
mock_datahub_graph_instance: DataHubGraph,
platform_instance: str,
mce_file: str,
mce_golden_file: str,
) -> None:
glue_source_instance = glue_source(
platform_instance=platform_instance,
mock_datahub_graph=mock_datahub_graph,
mock_datahub_graph_instance=mock_datahub_graph_instance,
emit_s3_lineage=True,
)

Expand Down Expand Up @@ -589,14 +589,14 @@ def test_glue_ingest_include_table_lineage(
def test_glue_ingest_include_column_lineage(
tmp_path: Path,
pytestconfig: PytestConfig,
mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph],
mock_datahub_graph_instance: DataHubGraph,
platform_instance: str,
mce_file: str,
mce_golden_file: str,
) -> None:
glue_source_instance = glue_source(
platform_instance=platform_instance,
mock_datahub_graph=mock_datahub_graph,
mock_datahub_graph_instance=mock_datahub_graph_instance,
emit_s3_lineage=True,
include_column_lineage=True,
use_s3_bucket_tags=False,
Expand Down
Loading
Loading