Skip to content

Commit

Permalink
[DOP-18570] Collect Spark metrics in DBWriter and FileDFWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Aug 6, 2024
1 parent c4a9cb8 commit bf1888c
Show file tree
Hide file tree
Showing 24 changed files with 1,237 additions and 21 deletions.
34 changes: 34 additions & 0 deletions onetl/_util/java.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

from typing import TYPE_CHECKING

from onetl._util.spark import get_spark_version
from onetl._util.version import Version

if TYPE_CHECKING:
from py4j.java_gateway import JavaGateway
from pyspark.sql import SparkSession
Expand All @@ -24,3 +27,34 @@ def try_import_java_class(spark_session: SparkSession, name: str):
klass = getattr(gateway.jvm, name)
gateway.help(klass, display=False)
return klass


def start_callback_server(spark_session: SparkSession):
"""
Start Py4J callback server. Important to receive Java events on Python side,
e.g. in Spark Listener implementations.
"""
gateway = get_java_gateway(spark_session)
if get_spark_version(spark_session) >= Version("2.4"):
from pyspark.java_gateway import ensure_callback_server_started

ensure_callback_server_started(gateway)
return

# python 2.3
if "_callback_server" not in gateway.__dict__ or gateway._callback_server is None:
from py4j.java_gateway import JavaObject

gateway.callback_server_parameters.eager_load = True
gateway.callback_server_parameters.daemonize = True
gateway.callback_server_parameters.daemonize_connections = True
gateway.callback_server_parameters.port = 0
gateway.start_callback_server(gateway.callback_server_parameters)
cbport = gateway._callback_server.server_socket.getsockname()[1]
gateway._callback_server.port = cbport
# gateway with real port
gateway._python_proxy_port = gateway._callback_server.port
# get the GatewayServer object in JVM by ID
java_gateway = JavaObject("GATEWAY_SERVER", gateway._gateway_client)
# update the port of CallbackClient with real port
java_gateway.resetCallbackClient(java_gateway.getCallbackClient().getAddress(), gateway._python_proxy_port)
7 changes: 7 additions & 0 deletions onetl/_util/scala.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ def get_default_scala_version(spark_version: Version) -> Version:
if spark_version.major < 3:
return Version("2.11")
return Version("2.12")


def scala_seq_to_python_list(seq) -> list:
result = []
for i in range(seq.size()):
result.append(seq.apply(i))
return result
3 changes: 2 additions & 1 deletion onetl/base/base_db_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

if TYPE_CHECKING:
from etl_entities.hwm import HWM
from pyspark.sql import DataFrame
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructField, StructType


Expand Down Expand Up @@ -106,6 +106,7 @@ class BaseDBConnection(BaseConnection):
Implements generic methods for reading and writing dataframe from/to database-like source
"""

spark: SparkSession
Dialect = BaseDBDialect

@property
Expand Down
4 changes: 3 additions & 1 deletion onetl/base/base_file_df_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from onetl.base.pure_path_protocol import PurePathProtocol

if TYPE_CHECKING:
from pyspark.sql import DataFrame, DataFrameReader, DataFrameWriter
from pyspark.sql import DataFrame, DataFrameReader, DataFrameWriter, SparkSession
from pyspark.sql.types import StructType


Expand Down Expand Up @@ -72,6 +72,8 @@ class BaseFileDFConnection(BaseConnection):
.. versionadded:: 0.9.0
"""

spark: SparkSession

@abstractmethod
def check_if_format_supported(
self,
Expand Down
1 change: 1 addition & 0 deletions onetl/db/db_writer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from onetl.db.db_writer.db_writer import DBWriter
from onetl.db.db_writer.result import DBWriterResult
44 changes: 35 additions & 9 deletions onetl/db/db_writer/db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from logging import getLogger
from typing import TYPE_CHECKING, Optional

from onetl.db.db_writer.result import DBWriterResult
from onetl.metrics.collector import SparkMetricsCollector

try:
from pydantic.v1 import Field, PrivateAttr, validator
except (ImportError, AttributeError):
Expand All @@ -16,6 +19,7 @@
from onetl.log import (
entity_boundary_log,
log_dataframe_schema,
log_lines,
log_options,
log_with_indent,
)
Expand Down Expand Up @@ -172,7 +176,7 @@ def validate_options(cls, options, values):
return None

@slot
def run(self, df: DataFrame):
def run(self, df: DataFrame) -> DBWriterResult:
"""
Method for writing your df to specified target. |support_hooks|
Expand All @@ -185,33 +189,50 @@ def run(self, df: DataFrame):
df : pyspark.sql.dataframe.DataFrame
Spark dataframe
Returns
-------
:obj:`DBWriterResult <onetl.db.writer.result.DBWriterResilt>`
DBWriter result object
Examples
--------
Write df to target:
Write dataframe to target:
.. code:: python
writer.run(df)
result = writer.run(df)
"""
if df.isStreaming:
raise ValueError(f"DataFrame is streaming. {self.__class__.__name__} supports only batch DataFrames.")

entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts")

if not self._connection_checked:
self._log_parameters()
log_dataframe_schema(log, df)
self.connection.check()
self._connection_checked = True

self.connection.write_df_to_target(
df=df,
target=str(self.target),
**self._get_write_kwargs(),
)
with SparkMetricsCollector(self.connection.spark) as collector:
try:
self.connection.write_df_to_target(
df=df,
target=str(self.target),
**self._get_write_kwargs(),
)
except Exception:
log.error(
"|%s| Error while writing dataframe. Target may contain partially written data!",
self.__class__.__name__,
)
raise
finally:
result = DBWriterResult(metrics=collector.recorded_metrics)
self._log_result(result)

entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
return result

def _log_parameters(self) -> None:
log.info("|Spark| -> |%s| Writing DataFrame to target using parameters:", self.connection.__class__.__name__)
Expand All @@ -225,3 +246,8 @@ def _get_write_kwargs(self) -> dict:
return {"options": self.options}

return {}

def _log_result(self, result: DBWriterResult) -> None:
log_with_indent(log, "")
log.info("|%s| Write result:", self.__class__.__name__)
log_lines(log, str(result))
116 changes: 116 additions & 0 deletions onetl/db/db_writer/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os
import textwrap

from onetl.impl import BaseModel
from onetl.metrics.metrics import SparkMetrics

INDENT = " " * 4


class DBWriterResult(BaseModel):
"""
Representation of DBWriter result.
.. versionadded:: 0.12.0
Examples
--------
>>> from onetl.db import DBWriter
>>> writer = DBWriter(...)
>>> result = writer.run(df)
>>> result
DBWriterResult(
metrics=SparkMetrics(
input=SparkInputMetrics(
read_rows=1_000,
read_files=10,
read_bytes=1_000_000,
scan_bytes=2_000_000,
read_partitions=3,
dynamic_partition_pruning=True,
),
output=SparkOutputMetrics(
written_rows=1_000,
created_files=10,
written_bytes=1_000_000,
created_dynamic_partitions=1,
),
executor=SparkExecutorMetrics(
run_time_milliseconds=1_000,
cpu_time_nanoseconds=2_000_000_000,
peak_memory_bytes=1_000_000_000,
),
)
)
"""

metrics: SparkMetrics

@property
def details(self) -> str:
"""
Return summarized information about the result object.
Examples
--------
>>> from onetl.db.writer import DBWriterResult
>>> from onetl.metrics import SparkMetrics, SparkOutputMetrics, SparkInputMetrics, SparkExecutorMetrics
>>> result1 = DBWriterResult(
... metrics=SparkMetrics(
... input=SparkInputMetrics(
... read_rows=1_000,
... read_files=10,
... read_bytes=1_000_000,
... scan_bytes=2_000_000,
... read_partitions=3,
... dynamic_partition_pruning=True,
... ),
... output=SparkOutputMetrics(
... written_bytes=1_000_000,
... written_rows=1_000,
... created_files=10,
... created_dynamic_partitions=1,
... ),
... executor=SparkExecutorMetrics(
... run_time_milliseconds=1_000,
... cpu_time_nanoseconds=2_000_000_000,
... peak_memory_bytes=1_000_000_000,
... ),
... )
... )
>>> print(result1.details)
Metrics:
Input:
Read rows: 1000
Read files: 10
Read size: 1.0 MB
Scan size: 2.0 MB
Dynamic partition pruning: True
Read partitions: 3
Output:
Written rows: 1000
Created files: 10
Written size: 1.0 MB
Created dynamic partitions: 1
Executor:
Run time: 1.0 ms
CPU time: 2.0 ms
Peak memory: 1.0 MB
>>> result2 = DBWriterResult()
>>> print(result2.details)
Metrics: No data
"""
if self.metrics.is_empty:
return "Metrics: No data"

return "Metrics:" + os.linesep + textwrap.indent(self.metrics.details, INDENT)

def __str__(self):
"""Same as :obj:`onetl.db.db_writer.result.DBWriterResult.details`"""
return self.details
35 changes: 28 additions & 7 deletions onetl/file/file_df_writer/file_df_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import logging
from typing import TYPE_CHECKING

from onetl.db.db_writer.result import DBWriterResult
from onetl.metrics.collector import SparkMetricsCollector

try:
from pydantic.v1 import PrivateAttr, validator
except (ImportError, AttributeError):
Expand All @@ -17,6 +20,7 @@
from onetl.log import (
entity_boundary_log,
log_dataframe_schema,
log_lines,
log_options,
log_with_indent,
)
Expand Down Expand Up @@ -93,7 +97,7 @@ class FileDFWriter(FrozenModel):
_connection_checked: bool = PrivateAttr(default=False)

@slot
def run(self, df: DataFrame) -> None:
def run(self, df: DataFrame) -> DBWriterResult:
"""
Method for writing DataFrame as files. |support_hooks|
Expand Down Expand Up @@ -125,14 +129,26 @@ def run(self, df: DataFrame) -> None:
self.connection.check()
self._connection_checked = True

self.connection.write_df_as_files(
df=df,
path=self.target_path,
format=self.format,
options=self.options,
)
with SparkMetricsCollector(self.connection.spark) as collector:
try:
self.connection.write_df_as_files(
df=df,
path=self.target_path,
format=self.format,
options=self.options,
)
except Exception:
log.error(
"|%s| Error while writing dataframe. Target may contain partially written data!",
self.__class__.__name__,
)
raise
finally:
result = DBWriterResult(metrics=collector.recorded_metrics)
self._log_result(result)

entity_boundary_log(log, f"{self.__class__.__name__}.run() ends", char="-")
return result

def _log_parameters(self, df: DataFrame) -> None:
log.info("|Spark| -> |%s| Writing dataframe using parameters:", self.connection.__class__.__name__)
Expand All @@ -143,6 +159,11 @@ def _log_parameters(self, df: DataFrame) -> None:
log_options(log, options_dict)
log_dataframe_schema(log, df)

def _log_result(self, result: DBWriterResult) -> None:
log_with_indent(log, "")
log.info("|%s| Write result:", self.__class__.__name__)
log_lines(log, str(result))

@validator("target_path", pre=True)
def _validate_target_path(cls, target_path, values):
connection: BaseFileDFConnection = values["connection"]
Expand Down
15 changes: 15 additions & 0 deletions onetl/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
# SPDX-License-Identifier: Apache-2.0
from onetl.metrics.collector import SparkMetricsCollector
from onetl.metrics.executor import SparkExecutorMetrics
from onetl.metrics.input import SparkInputMetrics
from onetl.metrics.metrics import SparkMetrics
from onetl.metrics.output import SparkOutputMetrics

__all__ = [
"SparkMetrics",
"SparkMetricsCollector",
"SparkExecutorMetrics",
"SparkInputMetrics",
"SparkOutputMetrics",
]
Loading

0 comments on commit bf1888c

Please sign in to comment.