Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-18570] Implement SparkMetricsRecorder #302

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/data/file-df/tracked.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
.github/workflows/data/file-df/**
onetl/file_df_connection/spark_file_df_connection.py
onetl/file/file_df_reader/**
onetl/file/file_df_writer/**
onetl/file/__init__.py
tests/resources/file_df_connection/**
**/*file_df*
**/*file_df*/**
17 changes: 17 additions & 0 deletions onetl/_metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from onetl._metrics.command import SparkCommandMetrics
from onetl._metrics.driver import SparkDriverMetrics
from onetl._metrics.executor import SparkExecutorMetrics
from onetl._metrics.input import SparkInputMetrics
from onetl._metrics.output import SparkOutputMetrics
from onetl._metrics.recorder import SparkMetricsRecorder

__all__ = [
"SparkCommandMetrics",
"SparkDriverMetrics",
"SparkMetricsRecorder",
"SparkExecutorMetrics",
"SparkInputMetrics",
"SparkOutputMetrics",
]
57 changes: 57 additions & 0 deletions onetl/_metrics/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
import textwrap

try:
from pydantic.v1 import Field
except (ImportError, AttributeError):
from pydantic import Field # type: ignore[no-redef, assignment]

from onetl._metrics.driver import SparkDriverMetrics
from onetl._metrics.executor import SparkExecutorMetrics
from onetl._metrics.input import SparkInputMetrics
from onetl._metrics.output import SparkOutputMetrics
from onetl.impl import BaseModel

INDENT = " " * 4


class SparkCommandMetrics(BaseModel):
input: SparkInputMetrics = Field(default_factory=SparkInputMetrics)
output: SparkOutputMetrics = Field(default_factory=SparkOutputMetrics)
driver: SparkDriverMetrics = Field(default_factory=SparkDriverMetrics)
executor: SparkExecutorMetrics = Field(default_factory=SparkExecutorMetrics)

@property
def is_empty(self) -> bool:
return all([self.input.is_empty, self.output.is_empty])

def update(self, other: SparkCommandMetrics) -> SparkCommandMetrics:
self.input.update(other.input)
self.output.update(other.output)
self.driver.update(other.driver)
self.executor.update(other.executor)
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = []
if not self.input.is_empty:
result.append(f"Input:{os.linesep}{textwrap.indent(self.input.details, INDENT)}")
if not self.output.is_empty:
result.append(f"Output:{os.linesep}{textwrap.indent(self.output.details, INDENT)}")
if not self.driver.is_empty:
result.append(f"Driver:{os.linesep}{textwrap.indent(self.driver.details, INDENT)}")
if not self.executor.is_empty:
result.append(f"Executor:{os.linesep}{textwrap.indent(self.executor.details, INDENT)}")

return os.linesep.join(result)

def __str__(self):
return self.details
39 changes: 39 additions & 0 deletions onetl/_metrics/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os

from humanize import naturalsize

from onetl.impl import BaseModel

# Metrics themselves are considered a part of driver result,
# ignore if result is smaller than 1MB
MIN_DRIVER_BYTES = 1_000_000


class SparkDriverMetrics(BaseModel):
in_memory_bytes: int = 0

@property
def is_empty(self) -> bool:
return self.in_memory_bytes < MIN_DRIVER_BYTES

def update(self, other: SparkDriverMetrics) -> SparkDriverMetrics:
self.in_memory_bytes += other.in_memory_bytes
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = []
if self.in_memory_bytes >= MIN_DRIVER_BYTES:
result.append(f"In-memory data (approximate): {naturalsize(self.in_memory_bytes)}")

return os.linesep.join(result)

def __str__(self):
return self.details
54 changes: 54 additions & 0 deletions onetl/_metrics/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
from datetime import timedelta

from humanize import naturalsize, precisedelta

from onetl.impl import BaseModel


class SparkExecutorMetrics(BaseModel):
total_run_time: timedelta = timedelta()
total_cpu_time: timedelta = timedelta()
peak_memory_bytes: int = 0
memory_spilled_bytes: int = 0
disk_spilled_bytes: int = 0

@property
def is_empty(self) -> bool:
return not self.total_run_time

def update(self, other: SparkExecutorMetrics) -> SparkExecutorMetrics:
self.total_run_time += other.total_run_time
self.total_cpu_time += other.total_cpu_time
self.peak_memory_bytes += other.peak_memory_bytes
self.memory_spilled_bytes += other.memory_spilled_bytes
self.disk_spilled_bytes += other.disk_spilled_bytes
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = [
f"Total run time: {precisedelta(self.total_run_time)}",
f"Total CPU time: {precisedelta(self.total_cpu_time)}",
]

if self.peak_memory_bytes:
result.append(f"Peak memory: {naturalsize(self.peak_memory_bytes)}")

if self.memory_spilled_bytes:
result.append(f"Memory spilled: {naturalsize(self.memory_spilled_bytes)}")

if self.disk_spilled_bytes:
result.append(f"Disk spilled: {naturalsize(self.disk_spilled_bytes)}")

return os.linesep.join(result)

def __str__(self):
return self.details
113 changes: 113 additions & 0 deletions onetl/_metrics/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import re
from datetime import timedelta
from typing import Any

try:
from pydantic.v1 import ByteSize
except (ImportError, AttributeError):
from pydantic import ByteSize # type: ignore[no-redef, assignment]

from onetl._metrics.command import SparkCommandMetrics
from onetl._metrics.driver import SparkDriverMetrics
from onetl._metrics.executor import SparkExecutorMetrics
from onetl._metrics.input import SparkInputMetrics
from onetl._metrics.listener.execution import (
SparkListenerExecution,
SparkSQLMetricNames,
)
from onetl._metrics.output import SparkOutputMetrics

# in some cases byte metrics have format "7.6 MiB", but sometimes it is:
# total (min, med, max (stageId: taskId))\n7.6 MiB (0.0 B, 7.6 MiB, 7.6 MiB (driver))
NON_BYTE_SIZE = re.compile(r"^[^\d.]+|\(.*\)", flags=re.DOTALL)


def _get_int(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None:
try:
return int(data[key][0])
except Exception:
return None


def _get_bytes(data: dict[SparkSQLMetricNames, list[str]], key: Any) -> int | None:
try:
raw_value = data[key][0]
normalized_value = NON_BYTE_SIZE.sub("", raw_value)
return int(ByteSize.validate(normalized_value))
except Exception:
return None


def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCommandMetrics:
input_read_bytes: int = 0
input_read_rows: int = 0
output_bytes: int = 0
output_rows: int = 0

run_time_milliseconds: int = 0
cpu_time_nanoseconds: int = 0
peak_memory_bytes: int = 0
memory_spilled_bytes: int = 0
disk_spilled_bytes: int = 0
result_size_bytes: int = 0

# some metrics are per-stage, and have to be summed, others are per-execution
for job in execution.jobs:
for stage in job.stages:
input_read_bytes += stage.metrics.input_metrics.bytes_read
input_read_rows += stage.metrics.input_metrics.records_read
output_bytes += stage.metrics.output_metrics.bytes_written
output_rows += stage.metrics.output_metrics.records_written

run_time_milliseconds += stage.metrics.executor_run_time_milliseconds
cpu_time_nanoseconds += stage.metrics.executor_cpu_time_nanoseconds
peak_memory_bytes += stage.metrics.peak_execution_memory_bytes
memory_spilled_bytes += stage.metrics.memory_spilled_bytes
disk_spilled_bytes += stage.metrics.disk_spilled_bytes
result_size_bytes += stage.metrics.result_size_bytes

# https://github.com/apache/spark/blob/v3.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473
input_file_count = (
_get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ)
or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ)
or 0
)
input_raw_file_bytes = (
_get_bytes(execution.metrics, SparkSQLMetricNames.SIZE_OF_FILES_READ)
or _get_bytes(execution.metrics, SparkSQLMetricNames.STATIC_SIZE_OF_FILES_READ)
or 0
)
input_read_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_PARTITIONS_READ) or 0

output_files = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_WRITTEN_FILES) or 0
output_dynamic_partitions = _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_DYNAMIC_PART) or 0

return SparkCommandMetrics(
input=SparkInputMetrics(
read_rows=input_read_rows,
read_files=input_file_count,
read_bytes=input_read_bytes,
raw_file_bytes=input_raw_file_bytes,
read_partitions=input_read_partitions,
),
output=SparkOutputMetrics(
written_rows=output_rows,
written_bytes=output_bytes,
created_files=output_files,
created_partitions=output_dynamic_partitions,
),
driver=SparkDriverMetrics(
in_memory_bytes=result_size_bytes,
),
executor=SparkExecutorMetrics(
total_run_time=timedelta(milliseconds=run_time_milliseconds),
total_cpu_time=timedelta(microseconds=cpu_time_nanoseconds / 1000),
peak_memory_bytes=peak_memory_bytes,
memory_spilled_bytes=memory_spilled_bytes,
disk_spilled_bytes=disk_spilled_bytes,
),
)
55 changes: 55 additions & 0 deletions onetl/_metrics/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
from pprint import pformat

from humanize import naturalsize

from onetl.impl import BaseModel


class SparkInputMetrics(BaseModel):
read_rows: int = 0
read_files: int = 0
read_partitions: int = 0
read_bytes: int = 0
raw_file_bytes: int = 0

@property
def is_empty(self) -> bool:
return not any([self.read_bytes, self.read_files, self.read_rows])

def update(self, other: SparkInputMetrics) -> SparkInputMetrics:
self.read_rows += other.read_rows
self.read_files += other.read_files
self.read_partitions += other.read_partitions
self.read_bytes += other.read_bytes
self.raw_file_bytes += other.raw_file_bytes
return self

@property
def details(self) -> str:
if self.is_empty:
return "No data"

result = []
result.append(f"Read rows: {pformat(self.read_rows)}")

if self.read_partitions:
result.append(f"Read partitions: {pformat(self.read_partitions)}")

if self.read_files:
result.append(f"Read files: {pformat(self.read_files)}")

if self.read_bytes:
result.append(f"Read size: {naturalsize(self.read_bytes)}")

if self.raw_file_bytes and self.read_bytes != self.raw_file_bytes:
result.append(f"Raw files size: {naturalsize(self.raw_file_bytes)}")

return os.linesep.join(result)

def __str__(self):
return self.details
29 changes: 29 additions & 0 deletions onetl/_metrics/listener/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from onetl._metrics.listener.execution import (
SparkListenerExecution,
SparkListenerExecutionStatus,
SparkSQLMetricNames,
)
from onetl._metrics.listener.job import SparkListenerJob, SparkListenerJobStatus
from onetl._metrics.listener.listener import SparkMetricsListener
from onetl._metrics.listener.stage import SparkListenerStage, SparkListenerStageStatus
from onetl._metrics.listener.task import (
SparkListenerTask,
SparkListenerTaskMetrics,
SparkListenerTaskStatus,
)

__all__ = [
"SparkListenerTask",
"SparkListenerTaskStatus",
"SparkListenerTaskMetrics",
"SparkListenerStage",
"SparkListenerStageStatus",
"SparkListenerJob",
"SparkListenerJobStatus",
"SparkListenerExecution",
"SparkListenerExecutionStatus",
"SparkSQLMetricNames",
"SparkMetricsListener",
]
Loading