-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DOP-18570] Implement SparkMetricsRecorder
- Loading branch information
Showing
32 changed files
with
2,261 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from onetl.metrics.command import SparkCommandMetrics | ||
from onetl.metrics.driver import SparkDriverMetrics | ||
from onetl.metrics.executor import SparkExecutorMetrics | ||
from onetl.metrics.input import SparkInputMetrics | ||
from onetl.metrics.output import SparkOutputMetrics | ||
from onetl.metrics.recorder import SparkMetricsRecorder | ||
|
||
__all__ = [ | ||
"SparkCommandMetrics", | ||
"SparkMetricsRecorder", | ||
"SparkExecutorMetrics", | ||
"SparkInputMetrics", | ||
"SparkOutputMetrics", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from __future__ import annotations | ||
|
||
import re | ||
from datetime import timedelta | ||
from typing import Any | ||
|
||
try: | ||
from pydantic.v1 import ByteSize | ||
except (ImportError, AttributeError): | ||
from pydantic import ByteSize # type: ignore[no-redef, assignment] | ||
|
||
from onetl.metrics._listener.execution import ( | ||
SparkListenerExecution, | ||
SparkSQLMetricNames, | ||
) | ||
from onetl.metrics.command import SparkCommandMetrics | ||
from onetl.metrics.driver import SparkDriverMetrics | ||
from onetl.metrics.executor import SparkExecutorMetrics | ||
from onetl.metrics.input import SparkInputMetrics | ||
from onetl.metrics.output import SparkOutputMetrics | ||
|
||
NON_DIGIT = re.compile(r"[^\d.]") | ||
|
||
|
||
def _get_int(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None: | ||
if key not in data: | ||
return None | ||
|
||
items = data[key] | ||
if not items: | ||
return None | ||
|
||
return int(items[0]) | ||
|
||
|
||
def _get_bytes(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None: | ||
if key not in data: | ||
return None | ||
|
||
items = data[key] | ||
if not items: | ||
return None | ||
|
||
return int(ByteSize.validate(items[0])) | ||
|
||
|
||
def _get_time(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> timedelta | None: # noqa: Found | ||
if key not in data: | ||
return None | ||
|
||
items = data[key] | ||
if not items: | ||
return None | ||
|
||
str_value = items[0] | ||
digits = NON_DIGIT.sub("", str_value) | ||
# reverse of msDurationToString: | ||
# https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1243-L1257 | ||
if str_value.endswith(" ms"): | ||
return timedelta(milliseconds=float(digits)) | ||
if str_value.endswith(" s"): | ||
return timedelta(seconds=float(digits)) | ||
if str_value.endswith(" m"): | ||
return timedelta(minutes=float(digits)) | ||
return timedelta(hours=float(digits)) | ||
|
||
|
||
def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCommandMetrics: | ||
input_read_bytes: int = 0 | ||
input_read_rows: int = 0 | ||
output_bytes: int = 0 | ||
output_rows: int = 0 | ||
|
||
run_time_milliseconds: int = 0 | ||
cpu_time_nanoseconds: int = 0 | ||
peak_memory_bytes: int = 0 | ||
memory_spilled_bytes: int = 0 | ||
disk_spilled_bytes: int = 0 | ||
result_size_bytes: int = 0 | ||
|
||
# some metrics are per-stage, and have to be summed, others are per-execution | ||
for job in execution.jobs: | ||
for stage in job.stages: | ||
input_read_bytes += stage.metrics.input_metrics.bytes_read | ||
input_read_rows += stage.metrics.input_metrics.records_read | ||
output_bytes += stage.metrics.output_metrics.bytes_written | ||
output_rows += stage.metrics.output_metrics.records_written | ||
|
||
run_time_milliseconds += stage.metrics.executor_run_time_milliseconds | ||
cpu_time_nanoseconds += stage.metrics.executor_cpu_time_nanoseconds | ||
peak_memory_bytes = max(peak_memory_bytes, stage.metrics.peak_execution_memory_bytes) | ||
memory_spilled_bytes += stage.metrics.memory_spilled_bytes | ||
disk_spilled_bytes += stage.metrics.disk_spilled_bytes | ||
result_size_bytes += stage.metrics.result_size_bytes | ||
|
||
# https://github.com/apache/spark/blob/v3.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473 | ||
input_file_count = ( | ||
_get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ) | ||
or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ) | ||
or 0 | ||
) | ||
input_raw_file_bytes = ( | ||
_get_bytes(execution.metrics, SparkSQLMetricNames.SIZE_OF_FILES_READ) | ||
or _get_bytes(execution.metrics, SparkSQLMetricNames.STATIC_SIZE_OF_FILES_READ) | ||
or 0 | ||
) | ||
input_read_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_PARTITIONS_READ) or 0 | ||
|
||
input_query_time = _get_time(execution.metrics, SparkSQLMetricNames.JDBC_QUERY_EXECUTION_TIME) or timedelta(0) | ||
|
||
output_files = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_WRITTEN_FILES) or 0 | ||
output_dynamic_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_DYNAMIC_PART) or 0 | ||
|
||
return SparkCommandMetrics( | ||
input=SparkInputMetrics( | ||
read_rows=input_read_rows, | ||
read_files=input_file_count, | ||
read_bytes=input_read_bytes, | ||
raw_file_bytes=input_raw_file_bytes, | ||
read_partitions=input_read_partitions, | ||
query_time=input_query_time, | ||
), | ||
output=SparkOutputMetrics( | ||
written_rows=output_rows, | ||
written_bytes=output_bytes, | ||
created_files=output_files, | ||
created_partitions=output_dynamic_partitions, | ||
), | ||
driver=SparkDriverMetrics( | ||
in_memory_bytes=result_size_bytes, | ||
), | ||
executor=SparkExecutorMetrics( | ||
total_run_time=timedelta(milliseconds=run_time_milliseconds), | ||
total_cpu_time=timedelta(microseconds=cpu_time_nanoseconds / 1000), | ||
peak_memory_bytes=peak_memory_bytes, | ||
memory_spilled_bytes=memory_spilled_bytes, | ||
disk_spilled_bytes=disk_spilled_bytes, | ||
), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems) | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from onetl.metrics._listener.execution import ( | ||
SparkListenerExecution, | ||
SparkListenerExecutionStatus, | ||
SparkSQLMetricNames, | ||
) | ||
from onetl.metrics._listener.job import SparkListenerJob, SparkListenerJobStatus | ||
from onetl.metrics._listener.listener import SparkMetricsListener | ||
from onetl.metrics._listener.stage import SparkListenerStage, SparkListenerStageStatus | ||
from onetl.metrics._listener.task import ( | ||
SparkListenerTask, | ||
SparkListenerTaskMetrics, | ||
SparkListenerTaskStatus, | ||
) | ||
|
||
__all__ = [ | ||
"SparkListenerTask", | ||
"SparkListenerTaskStatus", | ||
"SparkListenerTaskMetrics", | ||
"SparkListenerStage", | ||
"SparkListenerStageStatus", | ||
"SparkListenerJob", | ||
"SparkListenerJobStatus", | ||
"SparkListenerExecution", | ||
"SparkListenerExecutionStatus", | ||
"SparkSQLMetricNames", | ||
"SparkMetricsListener", | ||
] |
Oops, something went wrong.