Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): disable reporting for dry-run pipelines #11306

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
"config": {"filename": out_file.name},
},
},
no_default_report=True,
report_to=None,
)

pipeline.run()
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
if token is not None:
recipe["sink"]["config"]["token"] = token

pipeline = Pipeline.create(recipe, no_default_report=True)
pipeline = Pipeline.create(recipe, report_to=None)
pipeline.run()
ret = pipeline.pretty_print_summary()
sys.exit(ret)
Expand Down
29 changes: 16 additions & 13 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def run(
strict_warnings: bool,
preview_workunits: int,
test_source_connection: bool,
report_to: str,
report_to: Optional[str],
no_default_report: bool,
no_spinner: bool,
no_progress: bool,
Expand Down Expand Up @@ -160,7 +160,11 @@ async def run_pipeline_to_completion(pipeline: Pipeline) -> int:
raw_pipeline_config = pipeline_config.pop("__raw_config")

if test_source_connection:
_test_source_connection(report_to, pipeline_config)
sys.exit(_test_source_connection(report_to, pipeline_config))

if no_default_report:
# The default is "datahub" reporting. The extra flag will disable it.
report_to = None

async def run_ingestion_and_check_upgrade() -> int:
# TRICKY: We want to make sure that the Pipeline.create() call happens on the
Expand All @@ -171,13 +175,12 @@ async def run_ingestion_and_check_upgrade() -> int:
# logger.debug(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(
pipeline_config,
dry_run,
preview,
preview_workunits,
report_to,
no_default_report,
no_progress,
raw_pipeline_config,
dry_run=dry_run,
preview_mode=preview,
preview_workunits=preview_workunits,
report_to=report_to,
no_progress=no_progress,
raw_config=raw_pipeline_config,
)

version_stats_future = asyncio.ensure_future(
Expand Down Expand Up @@ -392,7 +395,7 @@ def deploy(
click.echo(response)


def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None:
def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> int:
connection_report = None
try:
connection_report = ConnectionManager().test_source_connection(pipeline_config)
Expand All @@ -401,12 +404,12 @@ def _test_source_connection(report_to: Optional[str], pipeline_config: dict) ->
with open(report_to, "w") as out_fp:
out_fp.write(connection_report.as_json())
logger.info(f"Wrote report successfully to {report_to}")
sys.exit(0)
return 0
except Exception as e:
logger.error(f"Failed to test connection due to {e}")
if connection_report:
logger.error(connection_report.as_json())
sys.exit(1)
return 1


def parse_restli_response(response):
Expand Down Expand Up @@ -447,7 +450,7 @@ def mcps(path: str) -> None:
},
}

pipeline = Pipeline.create(recipe, no_default_report=True)
pipeline = Pipeline.create(recipe, report_to=None)
pipeline.run()
ret = pipeline.pretty_print_summary()
sys.exit(ret)
Expand Down
25 changes: 13 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ def __init__(
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = None,
no_default_report: bool = False,
no_progress: bool = False,
):
self.config = config
Expand Down Expand Up @@ -279,7 +278,7 @@ def __init__(

with set_graph_context(self.graph):
with _add_init_error_context("configure reporters"):
self._configure_reporting(report_to, no_default_report)
self._configure_reporting(report_to)

with _add_init_error_context(
f"find a registered source for type {self.source_type}"
Expand Down Expand Up @@ -326,15 +325,19 @@ def _configure_transforms(self) -> None:
# Add the system metadata transformer at the end of the list.
self.transformers.append(SystemMetadataTransformer(self.ctx))

def _configure_reporting(
self, report_to: Optional[str], no_default_report: bool
) -> None:
if report_to == "datahub":
def _configure_reporting(self, report_to: Optional[str]) -> None:
if self.dry_run:
# In dry run mode, we don't want to report anything.
return

if not report_to:
# Reporting is disabled.
pass
elif report_to == "datahub":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this string to const at some point

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup - can do that in a follow up

# we add the default datahub reporter unless a datahub reporter is already configured
if not no_default_report and (
not self.config.reporting
or "datahub" not in [x.type for x in self.config.reporting]
):
if not self.config.reporting or "datahub" not in [
reporter.type for reporter in self.config.reporting
]:
self.config.reporting.append(
ReporterConfig.parse_obj({"type": "datahub"})
)
Expand Down Expand Up @@ -409,7 +412,6 @@ def create(
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = "datahub",
no_default_report: bool = False,
no_progress: bool = False,
raw_config: Optional[dict] = None,
) -> "Pipeline":
Expand All @@ -420,7 +422,6 @@ def create(
preview_mode=preview_mode,
preview_workunits=preview_workunits,
report_to=report_to,
no_default_report=no_default_report,
no_progress=no_progress,
)

Expand Down
Loading