From 9f6580e176400776fbcd341614406814032681ec Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 8 Oct 2024 10:48:11 -0700 Subject: [PATCH 1/2] feat(ingest): add extra reporting for rest sink ASYNC_BATCH mode --- .../src/datahub/ingestion/graph/client.py | 17 +++++++++-------- .../src/datahub/ingestion/sink/datahub_rest.py | 2 ++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 0fdb7bb537457d..b9b0ed556e66c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -214,27 +214,28 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict: def _post_generic(self, url: str, payload_dict: Dict) -> Dict: return self._send_restli_request("POST", url, json=payload_dict) - def _make_rest_sink_config(self) -> "DatahubRestSinkConfig": - from datahub.ingestion.sink.datahub_rest import ( - DatahubRestSinkConfig, - RestSinkMode, - ) + def _make_rest_sink_config( + self, extra_config: Optional[Dict] = None + ) -> "DatahubRestSinkConfig": + from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig # This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter, # but initializing the rest sink creates another rest emitter. # TODO: We should refactor out the multithreading functionality of the sink # into a separate class that can be used by both the sink and the graph client # e.g. a DatahubBulkRestEmitter that both the sink and the graph client use. - return DatahubRestSinkConfig(**self.config.dict(), mode=RestSinkMode.ASYNC) + return DatahubRestSinkConfig(**self.config.dict(), **(extra_config or {})) @contextlib.contextmanager def make_rest_sink( - self, run_id: str = _GRAPH_DUMMY_RUN_ID + self, + run_id: str = _GRAPH_DUMMY_RUN_ID, + extra_sink_config: Optional[Dict] = None, ) -> Iterator["DatahubRestSink"]: from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.sink.datahub_rest import DatahubRestSink - sink_config = self._make_rest_sink_config() + sink_config = self._make_rest_sink_config(extra_config=extra_sink_config) with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink: yield sink if sink.report.failures: diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 9059dcca3e2b85..5b4d3fe38ecd97 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -79,6 +79,7 @@ class DataHubRestSinkReport(SinkReport): gms_version: Optional[str] = None pending_requests: int = 0 + async_batches_prepared: int = 0 async_batches_split: int = 0 main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer) @@ -260,6 +261,7 @@ def _emit_batch_wrapper( events.append(event) chunks = self.emitter.emit_mcps(events) + self.report.async_batches_prepared += 1 if chunks > 1: self.report.async_batches_split += chunks logger.info( From c73a6bb2b4657b8a00f6c456dcbfdcf5911775b4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 8 Oct 2024 15:05:20 -0700 Subject: [PATCH 2/2] feat(ingest): support `__from_env__` special server value --- metadata-ingestion/src/datahub/cli/config_utils.py | 7 +++++++ metadata-ingestion/src/datahub/emitter/rest_emitter.py | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/metadata-ingestion/src/datahub/cli/config_utils.py b/metadata-ingestion/src/datahub/cli/config_utils.py index bb85809174ea96..5d9604de7836f9 100644 --- a/metadata-ingestion/src/datahub/cli/config_utils.py +++ b/metadata-ingestion/src/datahub/cli/config_utils.py @@ -84,6 +84,13 @@ def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]: return url or host, token +def require_config_from_env() -> Tuple[str, Optional[str]]: + host, token = _get_config_from_env() + if host is None: + raise MissingConfigError("No GMS host was provided in env variables.") + return host, token + + def load_client_config() -> DatahubClientConfig: gms_host_env, gms_token_env = _get_config_from_env() if gms_host_env: diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index e370ad3562a06b..948060c3c4f44c 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -76,6 +76,12 @@ def __init__( ): if not gms_server: raise ConfigurationError("gms server is required") + if gms_server == "__from_env__" and token is None: + # HACK: similar to what we do with system auth, we transparently + # inject the config in here. Ideally this should be done in the + # config loader or by the caller, but it gets the job done for now. + gms_server, token = config_utils.require_config_from_env() + self._gms_server = fixup_gms_url(gms_server) self._token = token self.server_config: Dict[str, Any] = {}