Skip to content

Commit

Permalink
[DOP-18570] Implement SparkMetricsRecorder
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Aug 8, 2024
1 parent c4a9cb8 commit a81fdb7
Show file tree
Hide file tree
Showing 31 changed files with 1,940 additions and 7 deletions.
17 changes: 17 additions & 0 deletions onetl/_metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from onetl._metrics.command import SparkCommandMetrics
from onetl._metrics.driver import SparkDriverMetrics
from onetl._metrics.executor import SparkExecutorMetrics
from onetl._metrics.input import SparkInputMetrics
from onetl._metrics.output import SparkOutputMetrics
from onetl._metrics.recorder import SparkMetricsRecorder

__all__ = [
"SparkCommandMetrics",
"SparkDriverMetrics",
"SparkMetricsRecorder",
"SparkExecutorMetrics",
"SparkInputMetrics",
"SparkOutputMetrics",
]
57 changes: 57 additions & 0 deletions onetl/_metrics/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
import textwrap

try:
from pydantic.v1 import Field
except (ImportError, AttributeError):
from pydantic import Field # type: ignore[no-redef, assignment]

from onetl._metrics.driver import SparkDriverMetrics
from onetl._metrics.executor import SparkExecutorMetrics
from onetl._metrics.input import SparkInputMetrics
from onetl._metrics.output import SparkOutputMetrics
from onetl.impl import BaseModel

INDENT = " " * 4


class SparkCommandMetrics(BaseModel):
input: SparkInputMetrics = Field(default_factory=SparkInputMetrics)
output: SparkOutputMetrics = Field(default_factory=SparkOutputMetrics)
driver: SparkDriverMetrics = Field(default_factory=SparkDriverMetrics)
executor: SparkExecutorMetrics = Field(default_factory=SparkExecutorMetrics)

@property
def is_empty(self) -> bool:
return all([self.input.is_empty, self.output.is_empty])

def update(self, other: SparkCommandMetrics) -> SparkCommandMetrics:
self.input.update(other.input)
self.output.update(other.output)
self.driver.update(other.driver)
self.executor.update(other.executor)
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = []
if not self.input.is_empty:
result.append(f"Input:{os.linesep}{textwrap.indent(self.input.details, INDENT)}")
if not self.output.is_empty:
result.append(f"Output:{os.linesep}{textwrap.indent(self.output.details, INDENT)}")
if not self.driver.is_empty:
result.append(f"Driver:{os.linesep}{textwrap.indent(self.driver.details, INDENT)}")
if not self.executor.is_empty:
result.append(f"Executor:{os.linesep}{textwrap.indent(self.executor.details, INDENT)}")

return os.linesep.join(result)

def __str__(self):
return self.details
39 changes: 39 additions & 0 deletions onetl/_metrics/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os

from humanize import naturalsize

from onetl.impl import BaseModel

# Metrics themselves are considered a part of driver result,
# ignore if result is smaller than 1MB
MIN_DRIVER_BYTES = 1_000_000


class SparkDriverMetrics(BaseModel):
in_memory_bytes: int = 0

@property
def is_empty(self) -> bool:
return self.in_memory_bytes < MIN_DRIVER_BYTES

def update(self, other: SparkDriverMetrics) -> SparkDriverMetrics:
self.in_memory_bytes += other.in_memory_bytes
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = []
if self.in_memory_bytes >= MIN_DRIVER_BYTES:
result.append(f"In-memory data (approximate): {naturalsize(self.in_memory_bytes)}")

return os.linesep.join(result)

def __str__(self):
return self.details
54 changes: 54 additions & 0 deletions onetl/_metrics/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
from datetime import timedelta

from humanize import naturalsize, precisedelta

from onetl.impl import BaseModel


class SparkExecutorMetrics(BaseModel):
total_run_time: timedelta = timedelta()
total_cpu_time: timedelta = timedelta()
peak_memory_bytes: int = 0
memory_spilled_bytes: int = 0
disk_spilled_bytes: int = 0

@property
def is_empty(self) -> bool:
return not self.total_run_time

def update(self, other: SparkExecutorMetrics) -> SparkExecutorMetrics:
self.total_run_time += other.total_run_time
self.total_cpu_time += other.total_cpu_time
self.peak_memory_bytes += other.peak_memory_bytes
self.memory_spilled_bytes += other.memory_spilled_bytes
self.disk_spilled_bytes += other.disk_spilled_bytes
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = [
f"Total run time: {precisedelta(self.total_run_time)}",
f"Total CPU time: {precisedelta(self.total_cpu_time)}",
]

if self.peak_memory_bytes:
result.append(f"Peak memory: {naturalsize(self.peak_memory_bytes)}")

if self.memory_spilled_bytes:
result.append(f"Memory spilled: {naturalsize(self.memory_spilled_bytes)}")

if self.disk_spilled_bytes:
result.append(f"Disk spilled: {naturalsize(self.disk_spilled_bytes)}")

return os.linesep.join(result)

def __str__(self):
return self.details
117 changes: 117 additions & 0 deletions onetl/_metrics/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import re
from datetime import timedelta
from typing import Any

try:
from pydantic.v1 import ByteSize
except (ImportError, AttributeError):
from pydantic import ByteSize # type: ignore[no-redef, assignment]

from onetl._metrics.command import SparkCommandMetrics
from onetl._metrics.driver import SparkDriverMetrics
from onetl._metrics.executor import SparkExecutorMetrics
from onetl._metrics.input import SparkInputMetrics
from onetl._metrics.listener.execution import (
SparkListenerExecution,
SparkSQLMetricNames,
)
from onetl._metrics.output import SparkOutputMetrics

NON_DIGIT = re.compile(r"[^\d.]")


def _get_int(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None:
if key not in data:
return None

items = data[key]
if not items:
return None

Check warning on line 33 in onetl/_metrics/extract.py

View check run for this annotation

Codecov / codecov/patch

onetl/_metrics/extract.py#L33

Added line #L33 was not covered by tests

return int(items[0])


def _get_bytes(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None:
if key not in data:
return None

items = data[key]
if not items:
return None

Check warning on line 44 in onetl/_metrics/extract.py

View check run for this annotation

Codecov / codecov/patch

onetl/_metrics/extract.py#L44

Added line #L44 was not covered by tests

return int(ByteSize.validate(items[0]))


def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCommandMetrics:
input_read_bytes: int = 0
input_read_rows: int = 0
output_bytes: int = 0
output_rows: int = 0

run_time_milliseconds: int = 0
cpu_time_nanoseconds: int = 0
peak_memory_bytes: int = 0
memory_spilled_bytes: int = 0
disk_spilled_bytes: int = 0
result_size_bytes: int = 0

# some metrics are per-stage, and have to be summed, others are per-execution
for job in execution.jobs:
for stage in job.stages:
input_read_bytes += stage.metrics.input_metrics.bytes_read
input_read_rows += stage.metrics.input_metrics.records_read
output_bytes += stage.metrics.output_metrics.bytes_written
output_rows += stage.metrics.output_metrics.records_written

run_time_milliseconds += stage.metrics.executor_run_time_milliseconds
cpu_time_nanoseconds += stage.metrics.executor_cpu_time_nanoseconds
peak_memory_bytes += stage.metrics.peak_execution_memory_bytes
memory_spilled_bytes += stage.metrics.memory_spilled_bytes
disk_spilled_bytes += stage.metrics.disk_spilled_bytes
result_size_bytes += stage.metrics.result_size_bytes

# https://github.com/apache/spark/blob/v3.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473
input_file_count = (
_get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ)
or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ)
or 0
)
input_raw_file_bytes = (
_get_bytes(execution.metrics, SparkSQLMetricNames.SIZE_OF_FILES_READ)
or _get_bytes(execution.metrics, SparkSQLMetricNames.STATIC_SIZE_OF_FILES_READ)
or 0
)
input_read_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_PARTITIONS_READ) or 0

output_files = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_WRITTEN_FILES) or 0
output_dynamic_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_DYNAMIC_PART) or 0

return SparkCommandMetrics(
input=SparkInputMetrics(
read_rows=input_read_rows,
read_files=input_file_count,
read_bytes=input_read_bytes,
raw_file_bytes=input_raw_file_bytes,
read_partitions=input_read_partitions,
),
output=SparkOutputMetrics(
written_rows=output_rows,
written_bytes=output_bytes,
created_files=output_files,
created_partitions=output_dynamic_partitions,
),
driver=SparkDriverMetrics(
in_memory_bytes=result_size_bytes,
),
executor=SparkExecutorMetrics(
total_run_time=timedelta(milliseconds=run_time_milliseconds),
total_cpu_time=timedelta(microseconds=cpu_time_nanoseconds / 1000),
peak_memory_bytes=peak_memory_bytes,
memory_spilled_bytes=memory_spilled_bytes,
disk_spilled_bytes=disk_spilled_bytes,
),
)
55 changes: 55 additions & 0 deletions onetl/_metrics/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
from pprint import pformat

from humanize import naturalsize

from onetl.impl import BaseModel


class SparkInputMetrics(BaseModel):
read_rows: int = 0
read_files: int = 0
read_partitions: int = 0
read_bytes: int = 0
raw_file_bytes: int = 0

@property
def is_empty(self) -> bool:
return not any([self.read_bytes, self.read_files, self.read_rows])

def update(self, other: SparkInputMetrics) -> SparkInputMetrics:
self.read_rows += other.read_rows
self.read_files += other.read_files
self.read_partitions += other.read_partitions
self.read_bytes += other.read_bytes
self.raw_file_bytes += other.raw_file_bytes
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = []
result.append(f"Read rows: {pformat(self.read_rows)}")

if self.read_partitions:
result.append(f"Read partitions: {pformat(self.read_partitions)}")

if self.read_files:
result.append(f"Read files: {pformat(self.read_files)}")

if self.read_bytes:
result.append(f"Read size: {naturalsize(self.read_bytes)}")

if self.raw_file_bytes and self.read_bytes != self.raw_file_bytes:
result.append(f"Raw files size: {naturalsize(self.raw_file_bytes)}")

return os.linesep.join(result)

def __str__(self):
return self.details
29 changes: 29 additions & 0 deletions onetl/_metrics/listener/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from onetl._metrics.listener.execution import (
SparkListenerExecution,
SparkListenerExecutionStatus,
SparkSQLMetricNames,
)
from onetl._metrics.listener.job import SparkListenerJob, SparkListenerJobStatus
from onetl._metrics.listener.listener import SparkMetricsListener
from onetl._metrics.listener.stage import SparkListenerStage, SparkListenerStageStatus
from onetl._metrics.listener.task import (
SparkListenerTask,
SparkListenerTaskMetrics,
SparkListenerTaskStatus,
)

__all__ = [
"SparkListenerTask",
"SparkListenerTaskStatus",
"SparkListenerTaskMetrics",
"SparkListenerStage",
"SparkListenerStageStatus",
"SparkListenerJob",
"SparkListenerJobStatus",
"SparkListenerExecution",
"SparkListenerExecutionStatus",
"SparkSQLMetricNames",
"SparkMetricsListener",
]
Loading

0 comments on commit a81fdb7

Please sign in to comment.