Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Gracefully handle unavailable apps and their aspects (#19561)" #19663

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion spark/changelog.d/19561.fixed

This file was deleted.

123 changes: 73 additions & 50 deletions spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,110 +392,133 @@ def _get_spark_app_ids(self, running_apps, tags):

return spark_apps

def _describe_app(self, property, running_apps, addl_tags):
def _spark_job_metrics(self, running_apps, addl_tags):
"""
Get payloads that describe certain property of the running apps.

Examples of "aspects":
- the app's jobs
- the app's spark stages
Get metrics for each Spark job.
"""
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request(base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, property)
try:
yield (response.json(), [f'app_name:{app_name}'] + addl_tags)
except JSONDecodeError:
self.log.debug(
'Skipping metrics for %s from app %s due to unparseable JSON payload.', property, app_name
)
continue
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'jobs'
)

def _spark_job_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark job.
"""
for jobs, app_tags in self._describe_app('jobs', running_apps, addl_tags):
for job in jobs:
job_tags = []
for job in response:

status = job.get('status')
job_tags.append('status:%s' % str(status).lower())

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)
tags.append('status:%s' % str(status).lower())

job_id = job.get('jobId')
if job_id is not None:
job_tags.append('job_id:{}'.format(job_id))
tags.append('job_id:{}'.format(job_id))

if not self._disable_spark_job_stage_tags:
for stage_id in job.get('stageIds', []):
job_tags.append('stage_id:{}'.format(stage_id))
tags.append('stage_id:{}'.format(stage_id))

tags = app_tags + job_tags
self._set_metrics_from_json(tags, job, SPARK_JOB_METRICS)
self._set_metric('spark.job.count', COUNT, 1, tags)

def _spark_stage_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark stage.
"""
for stages, app_tags in self._describe_app('stages', running_apps, addl_tags):
for stage in stages:
stage_tags = []
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'stages'
)

for stage in response:

status = stage.get('status')
stage_tags.append('status:%s' % str(status).lower())

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)
tags.append('status:%s' % str(status).lower())

stage_id = stage.get('stageId')
if stage_id is not None:
stage_tags.append('stage_id:{}'.format(stage_id))
tags.append('stage_id:{}'.format(stage_id))

tags = app_tags + stage_tags
self._set_metrics_from_json(tags, stage, SPARK_STAGE_METRICS)
self._set_metric('spark.stage.count', COUNT, 1, tags)

def _spark_executor_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark executor.
"""
for executors, app_tags in self._describe_app('executors', running_apps, addl_tags):
for executor in executors:
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'executors'
)

tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)

for executor in response:
if executor.get('id') == 'driver':
self._set_metrics_from_json(app_tags, executor, SPARK_DRIVER_METRICS)
self._set_metrics_from_json(tags, executor, SPARK_DRIVER_METRICS)
else:
self._set_metrics_from_json(app_tags, executor, SPARK_EXECUTOR_METRICS)
self._set_metrics_from_json(tags, executor, SPARK_EXECUTOR_METRICS)

if is_affirmative(self.instance.get('executor_level_metrics', False)):
self._set_metrics_from_json(
app_tags + ['executor_id:{}'.format(executor.get('id', 'unknown'))],
tags + ['executor_id:{}'.format(executor.get('id', 'unknown'))],
executor,
SPARK_EXECUTOR_LEVEL_METRICS,
)

if executors:
self._set_metric('spark.executor.count', COUNT, len(executors), app_tags)
if len(response):
self._set_metric('spark.executor.count', COUNT, len(response), tags)

def _spark_rdd_metrics(self, running_apps, addl_tags):
"""
Get metrics for each Spark RDD.
"""
for rdds, app_tags in self._describe_app('storage/rdd', running_apps, addl_tags):
for rdd in rdds:
self._set_metrics_from_json(app_tags, rdd, SPARK_RDD_METRICS)
for app_id, (app_name, tracking_url) in running_apps.items():

base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'storage/rdd'
)

if rdds:
self._set_metric('spark.rdd.count', COUNT, len(rdds), app_tags)
tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)

for rdd in response:
self._set_metrics_from_json(tags, rdd, SPARK_RDD_METRICS)

if len(response):
self._set_metric('spark.rdd.count', COUNT, len(response), tags)

def _spark_streaming_statistics_metrics(self, running_apps, addl_tags):
"""
Get metrics for each application streaming statistics.
"""
try:
for stats, app_tags in self._describe_app('streaming/statistics', running_apps, addl_tags):
self.log.debug('streaming/statistics: %s', stats)
self._set_metrics_from_json(app_tags, stats, SPARK_STREAMING_STATISTICS_METRICS)
except HTTPError as e:
self.log.debug("Got an error collecting streaming/statistics", e, exc_info=True)
pass
for app_id, (app_name, tracking_url) in running_apps.items():
try:
base_url = self._get_request_url(tracking_url)
response = self._rest_request_to_json(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, 'streaming/statistics'
)
self.log.debug('streaming/statistics: %s', response)
tags = ['app_name:%s' % str(app_name)]
tags.extend(addl_tags)

# NOTE: response is a dict
self._set_metrics_from_json(tags, response, SPARK_STREAMING_STATISTICS_METRICS)
except HTTPError as e:
# NOTE: If api call returns response 404
# then it means that the application is not a streaming application, we should skip metric submission
if e.response.status_code != 404:
raise

def _spark_structured_streams_metrics(self, running_apps, addl_tags):
"""
Expand Down
2 changes: 1 addition & 1 deletion spark/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
EXPECTED_E2E_METRICS = [
'spark.driver.total_shuffle_read',
'spark.stage.num_active_tasks',
'spark.streaming.statistics.num_inactive_receivers',
'spark.driver.max_memory',
'spark.streaming.statistics.num_active_batches',
'spark.driver.total_tasks',
Expand Down Expand Up @@ -90,7 +91,6 @@
'spark.structured_streaming.input_rate',
'spark.streaming.statistics.avg_input_rate',
'spark.streaming.statistics.avg_scheduling_delay',
'spark.streaming.statistics.num_inactive_receivers',
# The peak memory metrics are only available in Spark 3.0+ and after one loop of garbage collection
'spark.driver.peak_mem.jvm_heap_memory',
'spark.driver.peak_mem.jvm_off_heap_memory',
Expand Down
51 changes: 0 additions & 51 deletions spark/tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1248,57 +1248,6 @@ def test_no_running_apps(aggregator, dd_run_check, instance, service_check, capl
assert 'No running apps found. No metrics will be collected.' in caplog.text


@pytest.mark.unit
@pytest.mark.parametrize(
'property_url, missing_metrics',
[
pytest.param(YARN_SPARK_JOB_URL, SPARK_JOB_RUNNING_METRIC_VALUES, id='jobs'),
pytest.param(YARN_SPARK_STAGE_URL, SPARK_STAGE_RUNNING_METRIC_VALUES, id='stages'),
pytest.param(
YARN_SPARK_EXECUTOR_URL,
SPARK_EXECUTOR_METRIC_VALUES.keys() | SPARK_EXECUTOR_LEVEL_METRIC_VALUES.keys(),
id='executors',
),
pytest.param(YARN_SPARK_RDD_URL, SPARK_RDD_METRIC_VALUES, id='storage/rdd'),
pytest.param(
YARN_SPARK_STREAMING_STATISTICS_URL, SPARK_STREAMING_STATISTICS_METRIC_VALUES, id='streaming/statistics'
),
],
)
def test_yarn_no_json_for_app_properties(aggregator, dd_run_check, mocker, property_url, missing_metrics):
"""
In some yarn deployments apps stop exposing properties (such as jobs and stages) by the time we query them.

In these cases we skip only the specific missing apps and metrics while collecting all others.
"""

def get_without_json(url, *args, **kwargs):
arg_url = Url(url)
if arg_url == property_url:
return MockResponse(content="") # this should trigger json error
return yarn_requests_get_mock(url, *args, **kwargs)

mocker.patch('requests.get', get_without_json)
dd_run_check(SparkCheck('spark', {}, [YARN_CONFIG]))
for m in missing_metrics:
aggregator.assert_metric(m, count=0)


@pytest.mark.unit
@pytest.mark.parametrize('status_code', [404, 500])
def test_yarn_streaming_metrics_http_error(aggregator, dd_run_check, mocker, caplog, status_code):
def get_raise_error(url, *args, **kwargs):
arg_url = Url(url)
if arg_url == YARN_SPARK_STREAMING_STATISTICS_URL:
return MockResponse(status_code=status_code)
return yarn_requests_get_mock(url, *args, **kwargs)

mocker.patch('requests.get', get_raise_error)
dd_run_check(SparkCheck('spark', {}, [YARN_CONFIG]))
for m in SPARK_STREAMING_STATISTICS_METRIC_VALUES:
aggregator.assert_metric(m, count=0)


class StandaloneAppsResponseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
Expand Down
Loading