Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): support __from_env__ special server value #11569

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]:
return url or host, token


def require_config_from_env() -> Tuple[str, Optional[str]]:
host, token = _get_config_from_env()
if host is None:
raise MissingConfigError("No GMS host was provided in env variables.")
return host, token


def load_client_config() -> DatahubClientConfig:
gms_host_env, gms_token_env = _get_config_from_env()
if gms_host_env:
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def __init__(
):
if not gms_server:
raise ConfigurationError("gms server is required")
if gms_server == "__from_env__" and token is None:
# HACK: similar to what we do with system auth, we transparently
# inject the config in here. Ideally this should be done in the
# config loader or by the caller, but it gets the job done for now.
gms_server, token = config_utils.require_config_from_env()

self._gms_server = fixup_gms_url(gms_server)
self._token = token
self.server_config: Dict[str, Any] = {}
Expand Down
17 changes: 9 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,28 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict:
def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
return self._send_restli_request("POST", url, json=payload_dict)

def _make_rest_sink_config(self) -> "DatahubRestSinkConfig":
from datahub.ingestion.sink.datahub_rest import (
DatahubRestSinkConfig,
RestSinkMode,
)
def _make_rest_sink_config(
self, extra_config: Optional[Dict] = None
) -> "DatahubRestSinkConfig":
from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig

# This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter,
# but initializing the rest sink creates another rest emitter.
# TODO: We should refactor out the multithreading functionality of the sink
# into a separate class that can be used by both the sink and the graph client
# e.g. a DatahubBulkRestEmitter that both the sink and the graph client use.
return DatahubRestSinkConfig(**self.config.dict(), mode=RestSinkMode.ASYNC)
return DatahubRestSinkConfig(**self.config.dict(), **(extra_config or {}))

@contextlib.contextmanager
def make_rest_sink(
self, run_id: str = _GRAPH_DUMMY_RUN_ID
self,
run_id: str = _GRAPH_DUMMY_RUN_ID,
extra_sink_config: Optional[Dict] = None,
) -> Iterator["DatahubRestSink"]:
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.datahub_rest import DatahubRestSink

sink_config = self._make_rest_sink_config()
sink_config = self._make_rest_sink_config(extra_config=extra_sink_config)
with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink:
yield sink
if sink.report.failures:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class DataHubRestSinkReport(SinkReport):
gms_version: Optional[str] = None
pending_requests: int = 0

async_batches_prepared: int = 0
async_batches_split: int = 0

main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
Expand Down Expand Up @@ -260,6 +261,7 @@ def _emit_batch_wrapper(
events.append(event)

chunks = self.emitter.emit_mcps(events)
self.report.async_batches_prepared += 1
if chunks > 1:
self.report.async_batches_split += chunks
logger.info(
Expand Down
Loading