Skip to content

Commit

Permalink
feat(ingest): add extra reporting for rest sink ASYNC_BATCH mode (dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Dec 17, 2024
1 parent f03fb9a commit a94b88d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
17 changes: 9 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,28 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict:
def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
return self._send_restli_request("POST", url, json=payload_dict)

def _make_rest_sink_config(self) -> "DatahubRestSinkConfig":
from datahub.ingestion.sink.datahub_rest import (
DatahubRestSinkConfig,
RestSinkMode,
)
def _make_rest_sink_config(
self, extra_config: Optional[Dict] = None
) -> "DatahubRestSinkConfig":
from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig

# This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter,
# but initializing the rest sink creates another rest emitter.
# TODO: We should refactor out the multithreading functionality of the sink
# into a separate class that can be used by both the sink and the graph client
# e.g. a DatahubBulkRestEmitter that both the sink and the graph client use.
return DatahubRestSinkConfig(**self.config.dict(), mode=RestSinkMode.ASYNC)
return DatahubRestSinkConfig(**self.config.dict(), **(extra_config or {}))

@contextlib.contextmanager
def make_rest_sink(
self, run_id: str = _GRAPH_DUMMY_RUN_ID
self,
run_id: str = _GRAPH_DUMMY_RUN_ID,
extra_sink_config: Optional[Dict] = None,
) -> Iterator["DatahubRestSink"]:
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.datahub_rest import DatahubRestSink

sink_config = self._make_rest_sink_config()
sink_config = self._make_rest_sink_config(extra_config=extra_sink_config)
with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink:
yield sink
if sink.report.failures:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class DataHubRestSinkReport(SinkReport):
gms_version: Optional[str] = None
pending_requests: int = 0

async_batches_prepared: int = 0
async_batches_split: int = 0

main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
Expand Down Expand Up @@ -260,6 +261,7 @@ def _emit_batch_wrapper(
events.append(event)

chunks = self.emitter.emit_mcps(events)
self.report.async_batches_prepared += 1
if chunks > 1:
self.report.async_batches_split += chunks
logger.info(
Expand Down

0 comments on commit a94b88d

Please sign in to comment.