From aaecc5432674bd382f4e49e96832fc1b879bcf02 Mon Sep 17 00:00:00 2001 From: Ilia Kurenkov Date: Thu, 20 Feb 2025 14:45:40 +0100 Subject: [PATCH] Revert "Gracefully handle unavailable apps and their aspects (#19561)" This reverts commit 7c6d13f0d22062f1e05b38a913a4e244c88a18ac. --- spark/changelog.d/19561.fixed | 1 - spark/datadog_checks/spark/spark.py | 123 +++++++++++++++++----------- spark/tests/common.py | 2 +- spark/tests/test_spark.py | 51 ------------ 4 files changed, 74 insertions(+), 103 deletions(-) delete mode 100644 spark/changelog.d/19561.fixed diff --git a/spark/changelog.d/19561.fixed b/spark/changelog.d/19561.fixed deleted file mode 100644 index 9f62cba69f7e7..0000000000000 --- a/spark/changelog.d/19561.fixed +++ /dev/null @@ -1 +0,0 @@ -Gracefully handle unavailable apps and their aspects. Before we would throw an exception as soon as we encountered an error, which left a lot of available metrics uncollected. diff --git a/spark/datadog_checks/spark/spark.py b/spark/datadog_checks/spark/spark.py index 3c2f15ba4a30d..49270e6af450c 100644 --- a/spark/datadog_checks/spark/spark.py +++ b/spark/datadog_checks/spark/spark.py @@ -392,45 +392,33 @@ def _get_spark_app_ids(self, running_apps, tags): return spark_apps - def _describe_app(self, property, running_apps, addl_tags): + def _spark_job_metrics(self, running_apps, addl_tags): """ - Get payloads that describe certain property of the running apps. - - Examples of "aspects": - - the app's jobs - - the app's spark stages + Get metrics for each Spark job. """ for app_id, (app_name, tracking_url) in running_apps.items(): + base_url = self._get_request_url(tracking_url) - response = self._rest_request(base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, property) - try: - yield (response.json(), [f'app_name:{app_name}'] + addl_tags) - except JSONDecodeError: - self.log.debug( - 'Skipping metrics for %s from app %s due to unparseable JSON payload.', property, app_name - ) - continue + response = self._rest_request_to_json( + base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'jobs' + ) - def _spark_job_metrics(self, running_apps, addl_tags): - """ - Get metrics for each Spark job. - """ - for jobs, app_tags in self._describe_app('jobs', running_apps, addl_tags): - for job in jobs: - job_tags = [] + for job in response: status = job.get('status') - job_tags.append('status:%s' % str(status).lower()) + + tags = ['app_name:%s' % str(app_name)] + tags.extend(addl_tags) + tags.append('status:%s' % str(status).lower()) job_id = job.get('jobId') if job_id is not None: - job_tags.append('job_id:{}'.format(job_id)) + tags.append('job_id:{}'.format(job_id)) if not self._disable_spark_job_stage_tags: for stage_id in job.get('stageIds', []): - job_tags.append('stage_id:{}'.format(stage_id)) + tags.append('stage_id:{}'.format(stage_id)) - tags = app_tags + job_tags self._set_metrics_from_json(tags, job, SPARK_JOB_METRICS) self._set_metric('spark.job.count', COUNT, 1, tags) @@ -438,18 +426,25 @@ def _spark_stage_metrics(self, running_apps, addl_tags): """ Get metrics for each Spark stage. """ - for stages, app_tags in self._describe_app('stages', running_apps, addl_tags): - for stage in stages: - stage_tags = [] + for app_id, (app_name, tracking_url) in running_apps.items(): + + base_url = self._get_request_url(tracking_url) + response = self._rest_request_to_json( + base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'stages' + ) + + for stage in response: status = stage.get('status') - stage_tags.append('status:%s' % str(status).lower()) + + tags = ['app_name:%s' % str(app_name)] + tags.extend(addl_tags) + tags.append('status:%s' % str(status).lower()) stage_id = stage.get('stageId') if stage_id is not None: - stage_tags.append('stage_id:{}'.format(stage_id)) + tags.append('stage_id:{}'.format(stage_id)) - tags = app_tags + stage_tags self._set_metrics_from_json(tags, stage, SPARK_STAGE_METRICS) self._set_metric('spark.stage.count', COUNT, 1, tags) @@ -457,45 +452,73 @@ def _spark_executor_metrics(self, running_apps, addl_tags): """ Get metrics for each Spark executor. """ - for executors, app_tags in self._describe_app('executors', running_apps, addl_tags): - for executor in executors: + for app_id, (app_name, tracking_url) in running_apps.items(): + + base_url = self._get_request_url(tracking_url) + response = self._rest_request_to_json( + base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'executors' + ) + + tags = ['app_name:%s' % str(app_name)] + tags.extend(addl_tags) + + for executor in response: if executor.get('id') == 'driver': - self._set_metrics_from_json(app_tags, executor, SPARK_DRIVER_METRICS) + self._set_metrics_from_json(tags, executor, SPARK_DRIVER_METRICS) else: - self._set_metrics_from_json(app_tags, executor, SPARK_EXECUTOR_METRICS) + self._set_metrics_from_json(tags, executor, SPARK_EXECUTOR_METRICS) if is_affirmative(self.instance.get('executor_level_metrics', False)): self._set_metrics_from_json( - app_tags + ['executor_id:{}'.format(executor.get('id', 'unknown'))], + tags + ['executor_id:{}'.format(executor.get('id', 'unknown'))], executor, SPARK_EXECUTOR_LEVEL_METRICS, ) - if executors: - self._set_metric('spark.executor.count', COUNT, len(executors), app_tags) + if len(response): + self._set_metric('spark.executor.count', COUNT, len(response), tags) def _spark_rdd_metrics(self, running_apps, addl_tags): """ Get metrics for each Spark RDD. """ - for rdds, app_tags in self._describe_app('storage/rdd', running_apps, addl_tags): - for rdd in rdds: - self._set_metrics_from_json(app_tags, rdd, SPARK_RDD_METRICS) + for app_id, (app_name, tracking_url) in running_apps.items(): + + base_url = self._get_request_url(tracking_url) + response = self._rest_request_to_json( + base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'storage/rdd' + ) - if rdds: - self._set_metric('spark.rdd.count', COUNT, len(rdds), app_tags) + tags = ['app_name:%s' % str(app_name)] + tags.extend(addl_tags) + + for rdd in response: + self._set_metrics_from_json(tags, rdd, SPARK_RDD_METRICS) + + if len(response): + self._set_metric('spark.rdd.count', COUNT, len(response), tags) def _spark_streaming_statistics_metrics(self, running_apps, addl_tags): """ Get metrics for each application streaming statistics. """ - try: - for stats, app_tags in self._describe_app('streaming/statistics', running_apps, addl_tags): - self.log.debug('streaming/statistics: %s', stats) - self._set_metrics_from_json(app_tags, stats, SPARK_STREAMING_STATISTICS_METRICS) - except HTTPError as e: - self.log.debug("Got an error collecting streaming/statistics", e, exc_info=True) - pass + for app_id, (app_name, tracking_url) in running_apps.items(): + try: + base_url = self._get_request_url(tracking_url) + response = self._rest_request_to_json( + base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'streaming/statistics' + ) + self.log.debug('streaming/statistics: %s', response) + tags = ['app_name:%s' % str(app_name)] + tags.extend(addl_tags) + + # NOTE: response is a dict + self._set_metrics_from_json(tags, response, SPARK_STREAMING_STATISTICS_METRICS) + except HTTPError as e: + # NOTE: If api call returns response 404 + # then it means that the application is not a streaming application, we should skip metric submission + if e.response.status_code != 404: + raise def _spark_structured_streams_metrics(self, running_apps, addl_tags): """ diff --git a/spark/tests/common.py b/spark/tests/common.py index ec5b6d0474096..6d9b7d160f521 100644 --- a/spark/tests/common.py +++ b/spark/tests/common.py @@ -16,6 +16,7 @@ EXPECTED_E2E_METRICS = [ 'spark.driver.total_shuffle_read', 'spark.stage.num_active_tasks', + 'spark.streaming.statistics.num_inactive_receivers', 'spark.driver.max_memory', 'spark.streaming.statistics.num_active_batches', 'spark.driver.total_tasks', @@ -90,7 +91,6 @@ 'spark.structured_streaming.input_rate', 'spark.streaming.statistics.avg_input_rate', 'spark.streaming.statistics.avg_scheduling_delay', - 'spark.streaming.statistics.num_inactive_receivers', # The peak memory metrics are only available in Spark 3.0+ and after one loop of garbage collection 'spark.driver.peak_mem.jvm_heap_memory', 'spark.driver.peak_mem.jvm_off_heap_memory', diff --git a/spark/tests/test_spark.py b/spark/tests/test_spark.py index 599726f801606..9610c01e397a6 100644 --- a/spark/tests/test_spark.py +++ b/spark/tests/test_spark.py @@ -1248,57 +1248,6 @@ def test_no_running_apps(aggregator, dd_run_check, instance, service_check, capl assert 'No running apps found. No metrics will be collected.' in caplog.text -@pytest.mark.unit -@pytest.mark.parametrize( - 'property_url, missing_metrics', - [ - pytest.param(YARN_SPARK_JOB_URL, SPARK_JOB_RUNNING_METRIC_VALUES, id='jobs'), - pytest.param(YARN_SPARK_STAGE_URL, SPARK_STAGE_RUNNING_METRIC_VALUES, id='stages'), - pytest.param( - YARN_SPARK_EXECUTOR_URL, - SPARK_EXECUTOR_METRIC_VALUES.keys() | SPARK_EXECUTOR_LEVEL_METRIC_VALUES.keys(), - id='executors', - ), - pytest.param(YARN_SPARK_RDD_URL, SPARK_RDD_METRIC_VALUES, id='storage/rdd'), - pytest.param( - YARN_SPARK_STREAMING_STATISTICS_URL, SPARK_STREAMING_STATISTICS_METRIC_VALUES, id='streaming/statistics' - ), - ], -) -def test_yarn_no_json_for_app_properties(aggregator, dd_run_check, mocker, property_url, missing_metrics): - """ - In some yarn deployments apps stop exposing properties (such as jobs and stages) by the time we query them. - - In these cases we skip only the specific missing apps and metrics while collecting all others. - """ - - def get_without_json(url, *args, **kwargs): - arg_url = Url(url) - if arg_url == property_url: - return MockResponse(content="") # this should trigger json error - return yarn_requests_get_mock(url, *args, **kwargs) - - mocker.patch('requests.get', get_without_json) - dd_run_check(SparkCheck('spark', {}, [YARN_CONFIG])) - for m in missing_metrics: - aggregator.assert_metric(m, count=0) - - -@pytest.mark.unit -@pytest.mark.parametrize('status_code', [404, 500]) -def test_yarn_streaming_metrics_http_error(aggregator, dd_run_check, mocker, caplog, status_code): - def get_raise_error(url, *args, **kwargs): - arg_url = Url(url) - if arg_url == YARN_SPARK_STREAMING_STATISTICS_URL: - return MockResponse(status_code=status_code) - return yarn_requests_get_mock(url, *args, **kwargs) - - mocker.patch('requests.get', get_raise_error) - dd_run_check(SparkCheck('spark', {}, [YARN_CONFIG])) - for m in SPARK_STREAMING_STATISTICS_METRIC_VALUES: - aggregator.assert_metric(m, count=0) - - class StandaloneAppsResponseHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_GET(self): self.send_response(200)