Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed May 29, 2024
2 parents 2335d7f + 90ba720 commit 7c9c708
Show file tree
Hide file tree
Showing 32 changed files with 303 additions and 120 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/data/db/tracked.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.github/workflows/data/db/**
onetl/db_connection/db_connection.py
onetl/db_connection/jdbc*.py
onetl/db_connection/db_connection/*
onetl/db_connection/dialect_mixins/*
onetl/db_connection/jdbc_connection/*
onetl/db_connection/jdbc_mixin/*
onetl/db/**
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ repos:
- --no-extra-eol

- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
rev: v2.3.0
hooks:
- id: codespell
args: [-w]
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ Read data from MSSQL, transform & write to Hive.
database="Telecom",
spark=spark,
# These options are passed to MSSQL JDBC Driver:
extra={"ApplicationIntent": "ReadOnly"},
extra={"applicationIntent": "ReadOnly"},
).check()
# >>> INFO:|MSSQL| Connection is available
Expand Down
82 changes: 41 additions & 41 deletions docs/changelog/0.11.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,6 @@ most of users will not see any differences.

This brings few bugfixes with datetime format handling.

- Serialize ``ColumnDatetimeHWM`` to Clickhouse's ``DateTime64(6)`` (precision up to microseconds) instead of ``DateTime`` (precision up to seconds) (:github:pull:`267`).

In previous onETL versions, ``ColumnDatetimeHWM`` value was rounded to the second, and thus reading some rows that were read in previous runs,
producing duplicates.

For Clickhouse versions below 21.1 comparing column of type ``DateTime`` with a value of type ``DateTime64`` is not supported, returning an empty dataframe.
To avoid this, replace:

.. code:: python
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="hwm_column", # <--
),
)
with:

.. code:: python
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="CAST(hwm_column AS DateTime64)", # <-- add explicit CAST
),
)
- Pass JDBC connection extra params as ``properties`` dict instead of URL with query part (:github:pull:`268`).

This allows passing custom connection parameters like ``Clickhouse(extra={"custom_http_options": "option1=value1,option2=value2"})``
without need to apply urlencode to parameter value, like ``option1%3Dvalue1%2Coption2%3Dvalue2``.

- For JDBC connections add new ``SQLOptions`` class for ``DB.sql(query, options=...)`` method (:github:pull:`272`).

Firsly, to keep naming more consistent.
Expand Down Expand Up @@ -166,6 +131,41 @@ most of users will not see any differences.
For now, ``DB.fetch(query, options=...)`` and ``DB.execute(query, options=...)`` can accept ``JDBCOptions``, to keep backward compatibility,
but emit a deprecation warning. The old class will be removed in ``v1.0.0``.

- Serialize ``ColumnDatetimeHWM`` to Clickhouse's ``DateTime64(6)`` (precision up to microseconds) instead of ``DateTime`` (precision up to seconds) (:github:pull:`267`).

In previous onETL versions, ``ColumnDatetimeHWM`` value was rounded to the second, and thus reading some rows that were read in previous runs,
producing duplicates.

For Clickhouse versions below 21.1 comparing column of type ``DateTime`` with a value of type ``DateTime64`` is not supported, returning an empty dataframe.
To avoid this, replace:

.. code:: python
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="hwm_column", # <--
),
)
with:

.. code:: python
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="CAST(hwm_column AS DateTime64)", # <-- add explicit CAST
),
)
- Pass JDBC connection extra params as ``properties`` dict instead of URL with query part (:github:pull:`268`).

This allows passing custom connection parameters like ``Clickhouse(extra={"custom_http_options": "option1=value1,option2=value2"})``
without need to apply urlencode to parameter value, like ``option1%3Dvalue1%2Coption2%3Dvalue2``.

Features
--------

Expand All @@ -188,14 +188,14 @@ Improve user experience with Kafka messages and Database tables with serialized
* ``CSV.parse_column(col, schema=...)`` (:github:pull:`258`).
* ``XML.parse_column(col, schema=...)`` (:github:pull:`269`).

This allows parsing data in ``value`` field of Kafka message or string/binary column of some table as a nested Spark structure.
This allows parsing data in ``value`` field of Kafka message or string/binary column of some table as a nested Spark structure.

- Add ``FileFormat.serialize_column(...)`` method to several classes:
* ``Avro.serialize_column(col)`` (:github:pull:`265`).
* ``JSON.serialize_column(col)`` (:github:pull:`257`).
* ``CSV.serialize_column(col)`` (:github:pull:`258`).
* ``Avro.serialize_column(col)`` (:github:pull:`265`).
* ``JSON.serialize_column(col)`` (:github:pull:`257`).
* ``CSV.serialize_column(col)`` (:github:pull:`258`).

This allows saving Spark nested structures or arrays to ``value`` field of Kafka message or string/binary column of some table.
This allows saving Spark nested structures or arrays to ``value`` field of Kafka message or string/binary column of some table.

Improvements
------------
Expand All @@ -220,7 +220,7 @@ Few documentation improvements.

- Add note about connecting to Clickhouse cluster. (:github:pull:`280`).

- Add notes about versions when specific class/method/attribute/argument was added, renamed or changed behavior (:github:`282`).
- Add notes about versions when specific class/method/attribute/argument was added, renamed or changed behavior (:github:pull:`282`).


Bug Fixes
Expand Down
13 changes: 13 additions & 0 deletions docs/changelog/0.11.1.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
0.11.1 (2024-05-29)
===================

Features
--------

- Change ``MSSQL.port`` default from ``1433`` to ``None``, allowing use of ``instanceName`` to detect port number. (:github:pull:`287`)


Bug Fixes
---------

- Remove ``fetchsize`` from ``JDBC.WriteOptions``. (:github:pull:`288`)
1 change: 1 addition & 0 deletions docs/changelog/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
:caption: Changelog

DRAFT
0.11.1
0.11.0
0.10.2
0.10.1
Expand Down
11 changes: 9 additions & 2 deletions docs/connection/db_connection/hive/write.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ Examples
# Use the Hive partitioning columns to group the data. Create only 20 files per Hive partition.
# Also sort the data by column which most data is correlated with, reducing files size.
write_df = df.repartition(
20, "country", "business_date", "user_id"
).sortWithinPartitions("country", "business_date", "user_id")
20,
"country",
"business_date",
"user_id",
).sortWithinPartitions(
"country",
"business_date",
"user_id",
)
writer = DBWriter(
connection=hive,
Expand Down
1 change: 1 addition & 0 deletions docs/connection/db_connection/kafka/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Authentication mechanism
~~~~~~~~~~~~~~~~~~~~~~~~

Kafka can support different authentication mechanism (also known as `SASL <https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer>`_).

List of currently supported mechanisms:
* :obj:`PLAIN <onetl.connection.db_connection.kafka.kafka_basic_auth.KafkaBasicAuth>`. To no confuse this with ``PLAINTEXT`` connection protocol, onETL uses name ``BasicAuth``.
* :obj:`GSSAPI <onetl.connection.db_connection.kafka.kafka_kerberos_auth.KafkaKerberosAuth>`. To simplify naming, onETL uses name ``KerberosAuth``.
Expand Down
4 changes: 3 additions & 1 deletion docs/connection/db_connection/mssql/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ Connecting to MSSQL
Connection port
~~~~~~~~~~~~~~~

Connection is usually performed to port 1443. Port may differ for different MSSQL instances.
Connection is usually performed to port 1433. Port may differ for different MSSQL instances.
Please ask your MSSQL administrator to provide required information.

For named MSSQL instances (``instanceName`` option), `port number is optional <https://learn.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url?view=sql-server-ver16#named-and-multiple-sql-server-instances>`_, and could be omitted.

Connection host
~~~~~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion docs/connection/db_connection/mssql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ This is how MSSQL connector performs this:
it will be populated by MSSQL.
.. [2]
This is true only if DataFrame column is a ``StringType()``, because text value is parsed automatically to tagret column type.
This is true only if DataFrame column is a ``StringType()``, because text value is parsed automatically to target column type.
But other types cannot be silently converted, like ``int -> text``. This requires explicit casting, see `DBWriter`_.
Expand Down
12 changes: 6 additions & 6 deletions docs/connection/file_df_connection/spark_s3/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Resulting logs will look like this

.. dropdown:: See log

.. code:: txt
.. code:: text
23/08/03 11:25:10 DEBUG S3AFileSystem: Using S3ABlockOutputStream with buffer = disk; block=67108864; queue limit=4
23/08/03 11:25:10 DEBUG S3Guard: Metastore option source [core-default.xml]
Expand Down Expand Up @@ -193,7 +193,7 @@ Most common mistakes
No network access
^^^^^^^^^^^^^^^^^

.. code:: txt
.. code:: text
Caused by: java.net.ConnectException: Connection refused
Expand All @@ -206,7 +206,7 @@ Mostly caused by:
Using HTTPS protocol for HTTP port
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code:: txt
.. code:: text
Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL message
Expand All @@ -226,7 +226,7 @@ You should pass protocol explicitly:
SSL certificate is self-signed
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code:: txt
.. code:: text
sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
Expand All @@ -249,7 +249,7 @@ But is is **NOT** recommended.
Accessing S3 without domain-style access style support
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code:: txt
.. code:: text
Caused by: java.net.UnknownHostException: my-bucket.s3provider.com
Expand Down Expand Up @@ -285,7 +285,7 @@ There are `different types of committers <https://hadoop.apache.org/docs/stable/

This committer is quite slow and unstable, so it is not recommended to use:

.. code:: txt
.. code:: text
WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
Expand Down
4 changes: 2 additions & 2 deletions docs/logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Logging
=======

Logging is quite important to understant what's going on under the hood of onETL.
Logging is quite important to understand what's going on under the hood of onETL.

Default logging level for Python interpreters is ``WARNING``,
but most of onETL logs are in ``INFO`` level, so users usually don't see much.
Expand Down Expand Up @@ -69,7 +69,7 @@ This changes both log level and log formatting to something like this:
2024-04-12 10:12:12,190 [INFO ] MainThread: host = 'mssql.host'
2024-04-12 10:12:12,190 [INFO ] MainThread: port = 1433
2024-04-12 10:12:12,191 [INFO ] MainThread: database = 'somedb'
2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'ApplicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'}
2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'applicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'}
2024-04-12 10:12:12,191 [INFO ] MainThread: jdbc_url = 'jdbc:sqlserver:/mssql.host:1433'
2024-04-12 10:12:12,579 [INFO ] MainThread: |MSSQL| Connection is available.
2024-04-12 10:12:12,581 [INFO ] MainThread: |MSSQL| Executing SQL query (on driver):
Expand Down
2 changes: 1 addition & 1 deletion onetl/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.11.0
0.11.1
7 changes: 7 additions & 0 deletions onetl/connection/db_connection/clickhouse/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import logging
from typing import ClassVar, Optional

from etl_entities.instance import Host

from onetl._util.classproperty import classproperty
from onetl._util.version import Version
from onetl.connection.db_connection.clickhouse.dialect import ClickhouseDialect
Expand Down Expand Up @@ -106,6 +108,7 @@ class Clickhouse(JDBCConnection):
"""

host: Host
port: int = 8123
database: Optional[str] = None
extra: ClickhouseExtra = ClickhouseExtra()
Expand Down Expand Up @@ -189,6 +192,10 @@ def jdbc_params(self) -> dict:
result.update(self.extra.dict(by_alias=True))
return result

@property
def instance_url(self) -> str:
return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}"

@staticmethod
def _build_statement(
statement: str,
Expand Down
16 changes: 9 additions & 7 deletions onetl/connection/db_connection/greenplum/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from etl_entities.instance import Host

from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions

try:
from pydantic.v1 import validator
except (ImportError, AttributeError):
Expand Down Expand Up @@ -346,17 +348,17 @@ def get_df_schema(
self,
source: str,
columns: list[str] | None = None,
options: JDBCOptions | None = None,
options: JDBCReadOptions | None = None,
) -> StructType:
log.info("|%s| Fetching schema of table %r ...", self.__class__.__name__, source)

query = self.dialect.get_sql_query(source, columns=columns, limit=0, compact=True)
jdbc_options = self.JDBCOptions.parse(options).copy(update={"fetchsize": 0})
jdbc_options = self.ReadOptions.parse(options).copy(update={"fetchsize": 0})

log.debug("|%s| Executing SQL query (on driver):", self.__class__.__name__)
log_lines(log, query, level=logging.DEBUG)

df = self._query_on_driver(query, jdbc_options)
df = self._query_on_driver(query, self.FetchOptions.parse(jdbc_options.dict()))
log.info("|%s| Schema fetched.", self.__class__.__name__)

return df.schema
Expand All @@ -368,10 +370,10 @@ def get_min_max_values(
window: Window,
hint: Any | None = None,
where: Any | None = None,
options: JDBCOptions | None = None,
options: JDBCReadOptions | None = None,
) -> tuple[Any, Any]:
log.info("|%s| Getting min and max values for %r ...", self.__class__.__name__, window.expression)
jdbc_options = self.JDBCOptions.parse(options).copy(update={"fetchsize": 1})
jdbc_options = self.ReadOptions.parse(options).copy(update={"fetchsize": 1})

query = self.dialect.get_sql_query(
table=source,
Expand All @@ -391,7 +393,7 @@ def get_min_max_values(
log.info("|%s| Executing SQL query (on driver):", self.__class__.__name__)
log_lines(log, query)

df = self._query_on_driver(query, jdbc_options)
df = self._query_on_driver(query, self.FetchOptions.parse(jdbc_options.dict()))
row = df.collect()[0]
min_value = row["min"]
max_value = row["max"]
Expand Down Expand Up @@ -437,7 +439,7 @@ def _connector_params(
**extra,
}

def _options_to_connection_properties(self, options: JDBCOptions | JDBCExecuteOptions | JDBCFetchOptions):
def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions):
# See https://github.com/pgjdbc/pgjdbc/pull/1252
# Since 42.2.9 Postgres JDBC Driver added new option readOnlyMode=transaction
# Which is not a desired behavior, because `.fetch()` method should always be read-only
Expand Down
Loading

0 comments on commit 7c9c708

Please sign in to comment.