From c8680997b07a22de3ad2fe0e5160b3eeb2106909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 09:49:27 +0000 Subject: [PATCH 01/12] [DOP-16174] Bump version --- onetl/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onetl/VERSION b/onetl/VERSION index d9df1bbc0..af88ba824 100644 --- a/onetl/VERSION +++ b/onetl/VERSION @@ -1 +1 @@ -0.11.0 +0.11.1 From 0f0d0c6097566f5c67813ae490cc2b39d8cb66c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 09:50:19 +0000 Subject: [PATCH 02/12] [DOP-16174] Update CHANGELOG --- docs/changelog/0.11.0.rst | 70 +++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/docs/changelog/0.11.0.rst b/docs/changelog/0.11.0.rst index 8abdc2b7a..f6eb6d46a 100644 --- a/docs/changelog/0.11.0.rst +++ b/docs/changelog/0.11.0.rst @@ -28,41 +28,6 @@ most of users will not see any differences. This brings few bugfixes with datetime format handling. -- Serialize ``ColumnDatetimeHWM`` to Clickhouse's ``DateTime64(6)`` (precision up to microseconds) instead of ``DateTime`` (precision up to seconds) (:github:pull:`267`). - - In previous onETL versions, ``ColumnDatetimeHWM`` value was rounded to the second, and thus reading some rows that were read in previous runs, - producing duplicates. - - For Clickhouse versions below 21.1 comparing column of type ``DateTime`` with a value of type ``DateTime64`` is not supported, returning an empty dataframe. - To avoid this, replace: - - .. code:: python - - DBReader( - ..., - hwm=DBReader.AutoDetectHWM( - name="my_hwm", - expression="hwm_column", # <-- - ), - ) - - with: - - .. code:: python - - DBReader( - ..., - hwm=DBReader.AutoDetectHWM( - name="my_hwm", - expression="CAST(hwm_column AS DateTime64)", # <-- add explicit CAST - ), - ) - -- Pass JDBC connection extra params as ``properties`` dict instead of URL with query part (:github:pull:`268`). - - This allows passing custom connection parameters like ``Clickhouse(extra={"custom_http_options": "option1=value1,option2=value2"})`` - without need to apply urlencode to parameter value, like ``option1%3Dvalue1%2Coption2%3Dvalue2``. - - For JDBC connections add new ``SQLOptions`` class for ``DB.sql(query, options=...)`` method (:github:pull:`272`). Firsly, to keep naming more consistent. @@ -166,6 +131,41 @@ most of users will not see any differences. For now, ``DB.fetch(query, options=...)`` and ``DB.execute(query, options=...)`` can accept ``JDBCOptions``, to keep backward compatibility, but emit a deprecation warning. The old class will be removed in ``v1.0.0``. +- Serialize ``ColumnDatetimeHWM`` to Clickhouse's ``DateTime64(6)`` (precision up to microseconds) instead of ``DateTime`` (precision up to seconds) (:github:pull:`267`). + + In previous onETL versions, ``ColumnDatetimeHWM`` value was rounded to the second, and thus reading some rows that were read in previous runs, + producing duplicates. + + For Clickhouse versions below 21.1 comparing column of type ``DateTime`` with a value of type ``DateTime64`` is not supported, returning an empty dataframe. + To avoid this, replace: + + .. code:: python + + DBReader( + ..., + hwm=DBReader.AutoDetectHWM( + name="my_hwm", + expression="hwm_column", # <-- + ), + ) + + with: + + .. code:: python + + DBReader( + ..., + hwm=DBReader.AutoDetectHWM( + name="my_hwm", + expression="CAST(hwm_column AS DateTime64)", # <-- add explicit CAST + ), + ) + +- Pass JDBC connection extra params as ``properties`` dict instead of URL with query part (:github:pull:`268`). + + This allows passing custom connection parameters like ``Clickhouse(extra={"custom_http_options": "option1=value1,option2=value2"})`` + without need to apply urlencode to parameter value, like ``option1%3Dvalue1%2Coption2%3Dvalue2``. + Features -------- From fc9c75018e98b89c31157a77a4f0832fb73b6d6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 11:02:53 +0000 Subject: [PATCH 03/12] [DOP-16174] Update CHANGELOG --- docs/changelog/0.11.0.rst | 12 ++++++------ .../connection/db_connection/kafka/prerequisites.rst | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/changelog/0.11.0.rst b/docs/changelog/0.11.0.rst index f6eb6d46a..10a3e99b6 100644 --- a/docs/changelog/0.11.0.rst +++ b/docs/changelog/0.11.0.rst @@ -188,14 +188,14 @@ Improve user experience with Kafka messages and Database tables with serialized * ``CSV.parse_column(col, schema=...)`` (:github:pull:`258`). * ``XML.parse_column(col, schema=...)`` (:github:pull:`269`). - This allows parsing data in ``value`` field of Kafka message or string/binary column of some table as a nested Spark structure. + This allows parsing data in ``value`` field of Kafka message or string/binary column of some table as a nested Spark structure. - Add ``FileFormat.serialize_column(...)`` method to several classes: - * ``Avro.serialize_column(col)`` (:github:pull:`265`). - * ``JSON.serialize_column(col)`` (:github:pull:`257`). - * ``CSV.serialize_column(col)`` (:github:pull:`258`). + * ``Avro.serialize_column(col)`` (:github:pull:`265`). + * ``JSON.serialize_column(col)`` (:github:pull:`257`). + * ``CSV.serialize_column(col)`` (:github:pull:`258`). - This allows saving Spark nested structures or arrays to ``value`` field of Kafka message or string/binary column of some table. + This allows saving Spark nested structures or arrays to ``value`` field of Kafka message or string/binary column of some table. Improvements ------------ @@ -220,7 +220,7 @@ Few documentation improvements. - Add note about connecting to Clickhouse cluster. (:github:pull:`280`). -- Add notes about versions when specific class/method/attribute/argument was added, renamed or changed behavior (:github:`282`). +- Add notes about versions when specific class/method/attribute/argument was added, renamed or changed behavior (:github:pull:`282`). Bug Fixes diff --git a/docs/connection/db_connection/kafka/prerequisites.rst b/docs/connection/db_connection/kafka/prerequisites.rst index d6e0ac99d..29f5885b5 100644 --- a/docs/connection/db_connection/kafka/prerequisites.rst +++ b/docs/connection/db_connection/kafka/prerequisites.rst @@ -46,6 +46,7 @@ Authentication mechanism ~~~~~~~~~~~~~~~~~~~~~~~~ Kafka can support different authentication mechanism (also known as `SASL `_). + List of currently supported mechanisms: * :obj:`PLAIN `. To no confuse this with ``PLAINTEXT`` connection protocol, onETL uses name ``BasicAuth``. * :obj:`GSSAPI `. To simplify naming, onETL uses name ``KerberosAuth``. From 742391082d99ae6724f336e15b220477a5b5f17d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 12:56:17 +0000 Subject: [PATCH 04/12] [DOP-16270] Allow passing instanceName to MSSQL(extra=...) --- README.rst | 2 +- docs/changelog/next_release/287.feature.rst | 1 + .../db_connection/mssql/prerequisites.rst | 4 +- docs/logging.rst | 2 +- .../db_connection/clickhouse/connection.py | 7 +++ .../jdbc_connection/connection.py | 9 ---- .../db_connection/mssql/connection.py | 51 ++++++++++++++++--- .../db_connection/mysql/connection.py | 7 +++ .../db_connection/oracle/connection.py | 7 ++- .../db_connection/postgres/connection.py | 5 +- .../db_connection/teradata/connection.py | 7 +++ .../test_clickhouse_unit.py | 2 + .../test_greenplum_unit.py | 4 ++ .../test_mssql_unit.py | 46 ++++++++++++++--- .../test_mysql_unit.py | 6 +++ .../test_oracle_unit.py | 6 +++ .../test_postgres_unit.py | 4 ++ .../test_teradata_unit.py | 4 ++ 18 files changed, 146 insertions(+), 28 deletions(-) create mode 100644 docs/changelog/next_release/287.feature.rst diff --git a/README.rst b/README.rst index da0b84cb6..4f97ce087 100644 --- a/README.rst +++ b/README.rst @@ -341,7 +341,7 @@ Read data from MSSQL, transform & write to Hive. database="Telecom", spark=spark, # These options are passed to MSSQL JDBC Driver: - extra={"ApplicationIntent": "ReadOnly"}, + extra={"applicationIntent": "ReadOnly"}, ).check() # >>> INFO:|MSSQL| Connection is available diff --git a/docs/changelog/next_release/287.feature.rst b/docs/changelog/next_release/287.feature.rst new file mode 100644 index 000000000..e2f09341f --- /dev/null +++ b/docs/changelog/next_release/287.feature.rst @@ -0,0 +1 @@ +Change ``MSSQL.port`` default from ``1433`` to ``None``, allowing use of ``instanceName`` to detect port number. diff --git a/docs/connection/db_connection/mssql/prerequisites.rst b/docs/connection/db_connection/mssql/prerequisites.rst index 33786d61c..8dde0f6c3 100644 --- a/docs/connection/db_connection/mssql/prerequisites.rst +++ b/docs/connection/db_connection/mssql/prerequisites.rst @@ -27,9 +27,11 @@ Connecting to MSSQL Connection port ~~~~~~~~~~~~~~~ -Connection is usually performed to port 1443. Port may differ for different MSSQL instances. +Connection is usually performed to port 1433. Port may differ for different MSSQL instances. Please ask your MSSQL administrator to provide required information. +For named MSSQL instances (``instanceName`` option), `port number is optional `_, and could be omitted. + Connection host ~~~~~~~~~~~~~~~ diff --git a/docs/logging.rst b/docs/logging.rst index 1198a64d1..8b7138452 100644 --- a/docs/logging.rst +++ b/docs/logging.rst @@ -69,7 +69,7 @@ This changes both log level and log formatting to something like this: 2024-04-12 10:12:12,190 [INFO ] MainThread: host = 'mssql.host' 2024-04-12 10:12:12,190 [INFO ] MainThread: port = 1433 2024-04-12 10:12:12,191 [INFO ] MainThread: database = 'somedb' - 2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'ApplicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'} + 2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'applicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'} 2024-04-12 10:12:12,191 [INFO ] MainThread: jdbc_url = 'jdbc:sqlserver:/mssql.host:1433' 2024-04-12 10:12:12,579 [INFO ] MainThread: |MSSQL| Connection is available. 2024-04-12 10:12:12,581 [INFO ] MainThread: |MSSQL| Executing SQL query (on driver): diff --git a/onetl/connection/db_connection/clickhouse/connection.py b/onetl/connection/db_connection/clickhouse/connection.py index 0097f2862..0ca6d0ce7 100644 --- a/onetl/connection/db_connection/clickhouse/connection.py +++ b/onetl/connection/db_connection/clickhouse/connection.py @@ -5,6 +5,8 @@ import logging from typing import ClassVar, Optional +from etl_entities.instance import Host + from onetl._util.classproperty import classproperty from onetl._util.version import Version from onetl.connection.db_connection.clickhouse.dialect import ClickhouseDialect @@ -106,6 +108,7 @@ class Clickhouse(JDBCConnection): """ + host: Host port: int = 8123 database: Optional[str] = None extra: ClickhouseExtra = ClickhouseExtra() @@ -189,6 +192,10 @@ def jdbc_params(self) -> dict: result.update(self.extra.dict(by_alias=True)) return result + @property + def instance_url(self) -> str: + return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}" + @staticmethod def _build_statement( statement: str, diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index 586b63924..d8d03cc6b 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -7,8 +7,6 @@ import warnings from typing import TYPE_CHECKING, Any -from etl_entities.instance import Host - from onetl._internal import clear_statement from onetl.connection.db_connection.db_connection import DBConnection from onetl.connection.db_connection.jdbc_connection.dialect import JDBCDialect @@ -47,19 +45,12 @@ @support_hooks class JDBCConnection(JDBCMixin, DBConnection): - host: Host - port: int - Dialect = JDBCDialect ReadOptions = JDBCReadOptions SQLOptions = JDBCSQLOptions WriteOptions = JDBCWriteOptions Options = JDBCLegacyOptions - @property - def instance_url(self) -> str: - return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}" - @slot def sql( self, diff --git a/onetl/connection/db_connection/mssql/connection.py b/onetl/connection/db_connection/mssql/connection.py index 2ef1f1f69..945103536 100644 --- a/onetl/connection/db_connection/mssql/connection.py +++ b/onetl/connection/db_connection/mssql/connection.py @@ -3,7 +3,9 @@ from __future__ import annotations import warnings -from typing import ClassVar +from typing import ClassVar, Optional + +from etl_entities.instance import Host from onetl._util.classproperty import classproperty from onetl._util.version import Version @@ -92,6 +94,7 @@ class MSSQL(JDBCConnection): # Create connection mssql = MSSQL( host="database.host.or.ip", + port=1433, user="user", password="*****", extra={ @@ -110,17 +113,38 @@ class MSSQL(JDBCConnection): # Create connection mssql = MSSQL( host="database.host.or.ip", + port=1433, user="user", password="*****", extra={ - "Domain": "some.domain.com", # add here your domain - "IntegratedSecurity": "true", + "domain": "some.domain.com", # add here your domain + "integratedSecurity": "true", "authenticationScheme": "NTLM", "trustServerCertificate": "true", # add this to avoid SSL certificate issues }, spark=spark, ) + MSSQL connection with instance name: + + .. code:: python + + # Create Spark session with MSSQL driver loaded + ... + + # Create connection + mssql = MSSQL( + host="database.host.or.ip", + # !!! no port !!! + user="user", + password="*****", + extra={ + "instanceName": "myinstance", # add here your instance name + "trustServerCertificate": "true", # add this to avoid SSL certificate issues + }, + spark=spark, + ) + MSSQL read-only connection: .. code:: python @@ -131,10 +155,11 @@ class MSSQL(JDBCConnection): # Create connection mssql = MSSQL( host="database.host.or.ip", + port=1433, user="user", password="*****", extra={ - "ApplicationIntent": "ReadOnly", # driver will open read-only connection, to avoid writing to the database + "applicationIntent": "ReadOnly", # driver will open read-only connection, to avoid writing to the database "trustServerCertificate": "true", # add this to avoid SSL certificate issues }, spark=spark, @@ -143,7 +168,8 @@ class MSSQL(JDBCConnection): """ database: str - port: int = 1433 + host: Host + port: Optional[int] = None extra: MSSQLExtra = MSSQLExtra() ReadOptions = MSSQLReadOptions @@ -215,7 +241,11 @@ def package(cls) -> str: @property def jdbc_url(self) -> str: - return f"jdbc:sqlserver://{self.host}:{self.port}" + if self.port: + # automatic port discovery, like used with custom instanceName + # https://learn.microsoft.com/en-us/sql/connect/jdbc/building-the-connection-url?view=sql-server-ver16#named-and-multiple-sql-server-instances + return f"jdbc:sqlserver://{self.host}:{self.port}" + return f"jdbc:sqlserver://{self.host}" @property def jdbc_params(self) -> dict: @@ -226,4 +256,11 @@ def jdbc_params(self) -> dict: @property def instance_url(self) -> str: - return f"{super().instance_url}/{self.database}" + extra_dict = self.extra.dict(by_alias=True) + instance_name = extra_dict.get("instanceName") + if instance_name: + return rf"{self.__class__.__name__.lower()}://{self.host}\{instance_name}/{self.database}" + + # for backward compatibility keep port number in legacy HWM instance url + port = self.port or 1433 + return f"{self.__class__.__name__.lower()}://{self.host}:{port}/{self.database}" diff --git a/onetl/connection/db_connection/mysql/connection.py b/onetl/connection/db_connection/mysql/connection.py index 4e37d1d52..72090d585 100644 --- a/onetl/connection/db_connection/mysql/connection.py +++ b/onetl/connection/db_connection/mysql/connection.py @@ -5,6 +5,8 @@ import warnings from typing import ClassVar, Optional +from etl_entities.instance import Host + from onetl._util.classproperty import classproperty from onetl._util.version import Version from onetl.connection.db_connection.jdbc_connection import JDBCConnection @@ -103,6 +105,7 @@ class MySQL(JDBCConnection): """ + host: Host port: int = 3306 database: Optional[str] = None extra: MySQLExtra = MySQLExtra() @@ -168,3 +171,7 @@ def jdbc_params(self) -> dict: result = super().jdbc_params result.update(self.extra.dict(by_alias=True)) return result + + @property + def instance_url(self) -> str: + return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}" diff --git a/onetl/connection/db_connection/oracle/connection.py b/onetl/connection/db_connection/oracle/connection.py index fd0f9c1f7..8ca1b6ef1 100644 --- a/onetl/connection/db_connection/oracle/connection.py +++ b/onetl/connection/db_connection/oracle/connection.py @@ -17,6 +17,8 @@ except (ImportError, AttributeError): from pydantic import root_validator # type: ignore[no-redef, assignment] +from etl_entities.instance import Host + from onetl._internal import clear_statement from onetl._util.classproperty import classproperty from onetl._util.version import Version @@ -177,6 +179,7 @@ class Oracle(JDBCConnection): """ + host: Host port: int = 1521 sid: Optional[str] = None service_name: Optional[str] = None @@ -259,9 +262,9 @@ def jdbc_params(self) -> dict: @property def instance_url(self) -> str: if self.sid: - return f"{super().instance_url}/{self.sid}" + return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}/{self.sid}" - return f"{super().instance_url}/{self.service_name}" + return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}/{self.service_name}" @slot def get_min_max_values( diff --git a/onetl/connection/db_connection/postgres/connection.py b/onetl/connection/db_connection/postgres/connection.py index 44a36eb6e..a79465613 100644 --- a/onetl/connection/db_connection/postgres/connection.py +++ b/onetl/connection/db_connection/postgres/connection.py @@ -5,6 +5,8 @@ import warnings from typing import ClassVar +from etl_entities.instance import Host + from onetl._util.classproperty import classproperty from onetl._util.version import Version from onetl.connection.db_connection.jdbc_connection import JDBCConnection @@ -112,6 +114,7 @@ class Postgres(JDBCConnection): """ + host: Host database: str port: int = 5432 extra: PostgresExtra = PostgresExtra() @@ -178,7 +181,7 @@ def jdbc_params(self) -> dict[str, str]: @property def instance_url(self) -> str: - return f"{super().instance_url}/{self.database}" + return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}/{self.database}" def _options_to_connection_properties( self, diff --git a/onetl/connection/db_connection/teradata/connection.py b/onetl/connection/db_connection/teradata/connection.py index 3ca789a13..d6ea76acc 100644 --- a/onetl/connection/db_connection/teradata/connection.py +++ b/onetl/connection/db_connection/teradata/connection.py @@ -5,6 +5,8 @@ import warnings from typing import ClassVar, Optional +from etl_entities.instance import Host + from onetl._internal import stringify from onetl._util.classproperty import classproperty from onetl._util.version import Version @@ -123,6 +125,7 @@ class Teradata(JDBCConnection): """ + host: Host port: int = 1025 database: Optional[str] = None extra: TeradataExtra = TeradataExtra() @@ -201,3 +204,7 @@ def jdbc_url(self) -> str: connection_params.append(f"{key}={string_value}") return f"jdbc:teradata://{self.host}/{','.join(connection_params)}" + + @property + def instance_url(self) -> str: + return f"{self.__class__.__name__.lower()}://{self.host}:{self.port}" diff --git a/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py b/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py index 4ce55a572..ff36e0a66 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py @@ -131,6 +131,8 @@ def test_clickhouse(spark_mock): assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "clickhouse://some_host:8123" + def test_clickhouse_with_port(spark_mock): conn = Clickhouse( diff --git a/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py b/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py index f3b996140..0d382d44d 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py @@ -132,6 +132,8 @@ def test_greenplum(spark_mock): assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "greenplum://some_host:5432/database" + def test_greenplum_with_port(spark_mock): conn = Greenplum(host="some_host", port=5000, user="user", database="database", password="passwd", spark=spark_mock) @@ -153,6 +155,8 @@ def test_greenplum_with_port(spark_mock): "tcpKeepAlive": "true", } + assert conn.instance_url == "greenplum://some_host:5000/database" + def test_greenplum_without_database_error(spark_mock): with pytest.raises(ValueError, match="field required"): diff --git a/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py b/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py index 3e3f81496..e1a18aa9d 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py @@ -86,26 +86,28 @@ def test_mssql(spark_mock): conn = MSSQL(host="some_host", user="user", database="database", password="passwd", spark=spark_mock) assert conn.host == "some_host" - assert conn.port == 1433 + assert conn.port is None assert conn.user == "user" assert conn.password != "passwd" assert conn.password.get_secret_value() == "passwd" assert conn.database == "database" - assert conn.jdbc_url == "jdbc:sqlserver://some_host:1433" + assert conn.jdbc_url == "jdbc:sqlserver://some_host" assert conn.jdbc_params == { "user": "user", "password": "passwd", "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", - "url": "jdbc:sqlserver://some_host:1433", + "url": "jdbc:sqlserver://some_host", "databaseName": "database", } assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "mssql://some_host:1433/database" -def test_mssql_with_port(spark_mock): + +def test_mssql_with_custom_port(spark_mock): conn = MSSQL(host="some_host", port=5000, user="user", database="database", password="passwd", spark=spark_mock) assert conn.host == "some_host" @@ -124,6 +126,38 @@ def test_mssql_with_port(spark_mock): "databaseName": "database", } + assert conn.instance_url == "mssql://some_host:5000/database" + + +def test_mssql_with_instance_name(spark_mock): + conn = MSSQL( + host="some_host", + user="user", + database="database", + password="passwd", + extra={"instanceName": "myinstance"}, + spark=spark_mock, + ) + + assert conn.host == "some_host" + assert conn.port is None + assert conn.user == "user" + assert conn.password != "passwd" + assert conn.password.get_secret_value() == "passwd" + assert conn.database == "database" + + assert conn.jdbc_url == "jdbc:sqlserver://some_host" + assert conn.jdbc_params == { + "user": "user", + "password": "passwd", + "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", + "url": "jdbc:sqlserver://some_host", + "instanceName": "myinstance", + "databaseName": "database", + } + + assert conn.instance_url == "mssql://some_host\\myinstance/database" + def test_mssql_without_database_error(spark_mock): with pytest.raises(ValueError, match="field required"): @@ -145,12 +179,12 @@ def test_mssql_with_extra(spark_mock): spark=spark_mock, ) - assert conn.jdbc_url == "jdbc:sqlserver://some_host:1433" + assert conn.jdbc_url == "jdbc:sqlserver://some_host" assert conn.jdbc_params == { "user": "user", "password": "passwd", "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", - "url": "jdbc:sqlserver://some_host:1433", + "url": "jdbc:sqlserver://some_host", "databaseName": "database", "characterEncoding": "UTF-8", "trustServerCertificate": "true", diff --git a/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py b/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py index e95b34f1a..f2c68d939 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py @@ -92,6 +92,8 @@ def test_mysql(spark_mock): assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "mysql://some_host:3306" + def test_mysql_with_port(spark_mock): conn = MySQL(host="some_host", port=5000, user="user", database="database", password="passwd", spark=spark_mock) @@ -113,6 +115,8 @@ def test_mysql_with_port(spark_mock): "useUnicode": "yes", } + assert conn.instance_url == "mysql://some_host:5000" + def test_mysql_without_database(spark_mock): conn = MySQL(host="some_host", user="user", password="passwd", spark=spark_mock) @@ -134,6 +138,8 @@ def test_mysql_without_database(spark_mock): "useUnicode": "yes", } + assert conn.instance_url == "mysql://some_host:3306" + def test_mysql_with_extra(spark_mock): conn = MySQL( diff --git a/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py b/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py index 892c86906..ae7bf87cd 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py @@ -113,6 +113,8 @@ def test_oracle(spark_mock): assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "oracle://some_host:1521/sid" + def test_oracle_with_port(spark_mock): conn = Oracle(host="some_host", port=5000, user="user", sid="sid", password="passwd", spark=spark_mock) @@ -132,6 +134,8 @@ def test_oracle_with_port(spark_mock): "url": "jdbc:oracle:thin:@some_host:5000:sid", } + assert conn.instance_url == "oracle://some_host:5000/sid" + def test_oracle_uri_with_service_name(spark_mock): conn = Oracle(host="some_host", user="user", password="passwd", service_name="service", spark=spark_mock) @@ -144,6 +148,8 @@ def test_oracle_uri_with_service_name(spark_mock): "url": "jdbc:oracle:thin:@//some_host:1521/service", } + assert conn.instance_url == "oracle://some_host:1521/service" + def test_oracle_without_sid_and_service_name(spark_mock): with pytest.raises(ValueError, match="One of parameters ``sid``, ``service_name`` should be set, got none"): diff --git a/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py b/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py index 268525220..6e37417aa 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py @@ -93,6 +93,8 @@ def test_postgres(spark_mock): assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "postgres://some_host:5432/database" + def test_postgres_with_port(spark_mock): conn = Postgres(host="some_host", port=5000, user="user", database="database", password="passwd", spark=spark_mock) @@ -115,6 +117,8 @@ def test_postgres_with_port(spark_mock): "stringtype": "unspecified", } + assert conn.instance_url == "postgres://some_host:5000/database" + def test_postgres_without_database_error(spark_mock): with pytest.raises(ValueError, match="field required"): diff --git a/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py b/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py index b71d7e8d1..bef65a554 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py @@ -92,6 +92,8 @@ def test_teradata(spark_mock): assert "password='passwd'" not in str(conn) assert "password='passwd'" not in repr(conn) + assert conn.instance_url == "teradata://some_host:1025" + def test_teradata_with_port(spark_mock): conn = Teradata(host="some_host", port=5000, user="user", database="database", password="passwd", spark=spark_mock) @@ -114,6 +116,8 @@ def test_teradata_with_port(spark_mock): "url": conn.jdbc_url, } + assert conn.instance_url == "teradata://some_host:5000" + def test_teradata_without_database(spark_mock): conn = Teradata(host="some_host", user="user", password="passwd", spark=spark_mock) From 39258dda255be512b252a3ebc15d8eec81b034a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 14:50:52 +0000 Subject: [PATCH 05/12] [DOP-16174] Fix S3 troubleshooting guide --- .../file_df_connection/spark_s3/troubleshooting.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/connection/file_df_connection/spark_s3/troubleshooting.rst b/docs/connection/file_df_connection/spark_s3/troubleshooting.rst index a08669af9..20b3b9898 100644 --- a/docs/connection/file_df_connection/spark_s3/troubleshooting.rst +++ b/docs/connection/file_df_connection/spark_s3/troubleshooting.rst @@ -43,7 +43,7 @@ Resulting logs will look like this .. dropdown:: See log - .. code:: txt + .. code:: text 23/08/03 11:25:10 DEBUG S3AFileSystem: Using S3ABlockOutputStream with buffer = disk; block=67108864; queue limit=4 23/08/03 11:25:10 DEBUG S3Guard: Metastore option source [core-default.xml] @@ -193,7 +193,7 @@ Most common mistakes No network access ^^^^^^^^^^^^^^^^^ -.. code:: txt +.. code:: text Caused by: java.net.ConnectException: Connection refused @@ -206,7 +206,7 @@ Mostly caused by: Using HTTPS protocol for HTTP port ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. code:: txt +.. code:: text Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL message @@ -226,7 +226,7 @@ You should pass protocol explicitly: SSL certificate is self-signed ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. code:: txt +.. code:: text sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target @@ -249,7 +249,7 @@ But is is **NOT** recommended. Accessing S3 without domain-style access style support ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. code:: txt +.. code:: text Caused by: java.net.UnknownHostException: my-bucket.s3provider.com @@ -285,7 +285,7 @@ There are `different types of committers Date: Mon, 27 May 2024 14:12:16 +0000 Subject: [PATCH 06/12] [DOP-15764] Remove fetchsize from JDBCWriteOptions --- .github/workflows/data/db/tracked.txt | 5 +- docs/changelog/next_release/288.bugfix.rst | 1 + .../db_connection/greenplum/connection.py | 16 +++--- .../jdbc_connection/connection.py | 12 +++-- .../db_connection/jdbc_connection/options.py | 54 +++++++++++++++---- .../db_connection/jdbc_mixin/connection.py | 22 ++++---- .../db_connection/postgres/connection.py | 3 +- 7 files changed, 77 insertions(+), 36 deletions(-) create mode 100644 docs/changelog/next_release/288.bugfix.rst diff --git a/.github/workflows/data/db/tracked.txt b/.github/workflows/data/db/tracked.txt index 48980af41..2bb43b939 100644 --- a/.github/workflows/data/db/tracked.txt +++ b/.github/workflows/data/db/tracked.txt @@ -1,5 +1,6 @@ .github/workflows/data/db/** -onetl/db_connection/db_connection.py -onetl/db_connection/jdbc*.py +onetl/db_connection/db_connection/* onetl/db_connection/dialect_mixins/* +onetl/db_connection/jdbc_connection/* +onetl/db_connection/jdbc_mixin/* onetl/db/** diff --git a/docs/changelog/next_release/288.bugfix.rst b/docs/changelog/next_release/288.bugfix.rst new file mode 100644 index 000000000..82a095a8b --- /dev/null +++ b/docs/changelog/next_release/288.bugfix.rst @@ -0,0 +1 @@ +Remove ``fetchsize`` from ``JDBC.WriteOptions``. diff --git a/onetl/connection/db_connection/greenplum/connection.py b/onetl/connection/db_connection/greenplum/connection.py index be62afa5f..7ed60539b 100644 --- a/onetl/connection/db_connection/greenplum/connection.py +++ b/onetl/connection/db_connection/greenplum/connection.py @@ -10,6 +10,8 @@ from etl_entities.instance import Host +from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions + try: from pydantic.v1 import validator except (ImportError, AttributeError): @@ -346,17 +348,17 @@ def get_df_schema( self, source: str, columns: list[str] | None = None, - options: JDBCOptions | None = None, + options: JDBCReadOptions | None = None, ) -> StructType: log.info("|%s| Fetching schema of table %r ...", self.__class__.__name__, source) query = self.dialect.get_sql_query(source, columns=columns, limit=0, compact=True) - jdbc_options = self.JDBCOptions.parse(options).copy(update={"fetchsize": 0}) + jdbc_options = self.ReadOptions.parse(options).copy(update={"fetchsize": 0}) log.debug("|%s| Executing SQL query (on driver):", self.__class__.__name__) log_lines(log, query, level=logging.DEBUG) - df = self._query_on_driver(query, jdbc_options) + df = self._query_on_driver(query, self.FetchOptions.parse(jdbc_options.dict())) log.info("|%s| Schema fetched.", self.__class__.__name__) return df.schema @@ -368,10 +370,10 @@ def get_min_max_values( window: Window, hint: Any | None = None, where: Any | None = None, - options: JDBCOptions | None = None, + options: JDBCReadOptions | None = None, ) -> tuple[Any, Any]: log.info("|%s| Getting min and max values for %r ...", self.__class__.__name__, window.expression) - jdbc_options = self.JDBCOptions.parse(options).copy(update={"fetchsize": 1}) + jdbc_options = self.ReadOptions.parse(options).copy(update={"fetchsize": 1}) query = self.dialect.get_sql_query( table=source, @@ -391,7 +393,7 @@ def get_min_max_values( log.info("|%s| Executing SQL query (on driver):", self.__class__.__name__) log_lines(log, query) - df = self._query_on_driver(query, jdbc_options) + df = self._query_on_driver(query, self.FetchOptions.parse(jdbc_options.dict())) row = df.collect()[0] min_value = row["min"] max_value = row["max"] @@ -437,7 +439,7 @@ def _connector_params( **extra, } - def _options_to_connection_properties(self, options: JDBCOptions | JDBCExecuteOptions | JDBCFetchOptions): + def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions): # See https://github.com/pgjdbc/pgjdbc/pull/1252 # Since 42.2.9 Postgres JDBC Driver added new option readOnlyMode=transaction # Which is not a desired behavior, because `.fetch()` method should always be read-only diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index d8d03cc6b..e6716ae5e 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -19,7 +19,7 @@ JDBCWriteOptions, ) from onetl.connection.db_connection.jdbc_mixin import JDBCMixin -from onetl.connection.db_connection.jdbc_mixin.options import JDBCOptions +from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions from onetl.hooks import slot, support_hooks from onetl.hwm import Window from onetl.log import log_lines, log_with_indent @@ -256,10 +256,12 @@ def _exclude_partition_options( self, options: JDBCReadOptions, fetchsize: int, - ) -> JDBCOptions: - return options.copy( - update={"fetchsize": fetchsize}, - exclude={"partition_column", "lower_bound", "upper_bound", "num_partitions", "partitioning_mode"}, + ) -> JDBCFetchOptions: + return self.FetchOptions.parse( + options.copy( + update={"fetchsize": fetchsize}, + exclude={"partition_column", "lower_bound", "upper_bound", "num_partitions", "partitioning_mode"}, + ).dict(), ) def _set_lower_upper_bound( diff --git a/onetl/connection/db_connection/jdbc_connection/options.py b/onetl/connection/db_connection/jdbc_connection/options.py index 12ac24193..cd4538f29 100644 --- a/onetl/connection/db_connection/jdbc_connection/options.py +++ b/onetl/connection/db_connection/jdbc_connection/options.py @@ -6,6 +6,8 @@ from enum import Enum from typing import Optional +from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions + try: from pydantic.v1 import Field, PositiveInt, root_validator except (ImportError, AttributeError): @@ -14,11 +16,15 @@ from typing_extensions import deprecated from onetl._internal import to_camel -from onetl.connection.db_connection.jdbc_mixin.options import JDBCOptions +from onetl.impl import GenericOptions # options from spark.read.jdbc which are populated by JDBCConnection methods GENERIC_PROHIBITED_OPTIONS = frozenset( ( + "user", + "password", + "driver", + "url", "table", "dbtable", "query", @@ -104,7 +110,7 @@ def __str__(self): return str(self.value) -class JDBCReadOptions(JDBCOptions): +class JDBCReadOptions(JDBCFetchOptions): """Spark JDBC reading options. .. note :: @@ -136,7 +142,8 @@ class JDBCReadOptions(JDBCOptions): class Config: known_options = READ_OPTIONS | READ_WRITE_OPTIONS - prohibited_options = JDBCOptions.Config.prohibited_options | GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS + prohibited_options = GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS + extra = "allow" alias_generator = to_camel # Options in DataFrameWriter.jdbc() method @@ -185,6 +192,14 @@ class Config: """ ''' + query_timeout: Optional[int] = Field(default=None, alias="queryTimeout") + """The number of seconds the driver will wait for a statement to execute. + Zero means there is no limit. + + This option depends on driver implementation, + some drivers can check the timeout of each query instead of an entire JDBC batch. + """ + fetchsize: int = 100_000 """Fetch N rows from an opened cursor per one read round. @@ -380,7 +395,7 @@ def _partitioning_mode_actions(cls, values): return values -class JDBCWriteOptions(JDBCOptions): +class JDBCWriteOptions(GenericOptions): """Spark JDBC writing options. .. note :: @@ -406,7 +421,8 @@ class JDBCWriteOptions(JDBCOptions): class Config: known_options = WRITE_OPTIONS | READ_WRITE_OPTIONS - prohibited_options = JDBCOptions.Config.prohibited_options | GENERIC_PROHIBITED_OPTIONS | READ_OPTIONS + prohibited_options = GENERIC_PROHIBITED_OPTIONS | READ_OPTIONS + extra = "allow" alias_generator = to_camel if_exists: JDBCTableExistBehavior = Field(default=JDBCTableExistBehavior.APPEND, alias="mode") @@ -481,6 +497,14 @@ class Config: Renamed ``mode`` → ``if_exists`` """ + query_timeout: Optional[int] = Field(default=None, alias="queryTimeout") + """The number of seconds the driver will wait for a statement to execute. + Zero means there is no limit. + + This option depends on driver implementation, + some drivers can check the timeout of each query instead of an entire JDBC batch. + """ + batchsize: int = 20_000 """How many rows can be inserted per round trip. @@ -531,7 +555,7 @@ def _mode_is_deprecated(cls, values): return values -class JDBCSQLOptions(JDBCOptions): +class JDBCSQLOptions(GenericOptions): """Options specifically for SQL queries These options allow you to specify configurations for executing SQL queries @@ -580,10 +604,10 @@ class JDBCSQLOptions(JDBCOptions): """Number of jobs created by Spark to read the table content in parallel.""" # noqa: WPS322 lower_bound: Optional[int] = None - """Defines the starting boundary for partitioning the query's data. Mandatory if :obj:`~partition_column~ is set""" # noqa: WPS322 + """Defines the starting boundary for partitioning the query's data. Mandatory if :obj:`~partition_column` is set""" # noqa: WPS322 upper_bound: Optional[int] = None - """Sets the ending boundary for data partitioning. Mandatory if :obj:`~partition_column~ is set""" # noqa: WPS322 + """Sets the ending boundary for data partitioning. Mandatory if :obj:`~partition_column` is set""" # noqa: WPS322 session_init_statement: Optional[str] = None '''After each database session is opened to the remote DB and before starting to read data, @@ -603,6 +627,14 @@ class JDBCSQLOptions(JDBCOptions): """ ''' + query_timeout: Optional[int] = Field(default=None, alias="queryTimeout") + """The number of seconds the driver will wait for a statement to execute. + Zero means there is no limit. + + This option depends on driver implementation, + some drivers can check the timeout of each query instead of an entire JDBC batch. + """ + fetchsize: int = 100_000 """Fetch N rows from an opened cursor per one read round. @@ -624,7 +656,8 @@ class JDBCSQLOptions(JDBCOptions): class Config: known_options = READ_OPTIONS - {"partitioning_mode"} - prohibited_options = JDBCOptions.Config.prohibited_options | {"partitioning_mode"} + prohibited_options = GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS | {"partitioning_mode"} + extra = "allow" alias_generator = to_camel @root_validator(pre=True) @@ -645,4 +678,5 @@ def _check_partition_fields(cls, values): ) class JDBCLegacyOptions(JDBCReadOptions, JDBCWriteOptions): class Config: - prohibited_options = JDBCOptions.Config.prohibited_options + prohibited_options = GENERIC_PROHIBITED_OPTIONS + extra = "allow" diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index f42450b6e..8ab430751 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -9,6 +9,8 @@ from enum import Enum, auto from typing import TYPE_CHECKING, Callable, ClassVar, Optional, TypeVar +from onetl.impl.generic_options import GenericOptions + try: from pydantic.v1 import Field, PrivateAttr, SecretStr, validator except (ImportError, AttributeError): @@ -222,7 +224,7 @@ def fetch( def execute( self, statement: str, - options: JDBCExecuteOptions | JDBCMixinOptions | dict | None = None, + options: JDBCExecuteOptions | dict | None = None, ) -> DataFrame | None: """ **Immediately** execute DDL, DML or procedure/function **on Spark driver**. |support_hooks| @@ -267,7 +269,7 @@ def execute( log_lines(log, statement) call_options = ( - self.ExecuteOptions.parse(options.dict()) + self.ExecuteOptions.parse(options.dict()) # type: ignore if isinstance(options, JDBCMixinOptions) else self.ExecuteOptions.parse(options) ) @@ -302,7 +304,7 @@ def _check_java_class_imported(cls, spark): def _query_on_driver( self, query: str, - options: JDBCMixinOptions | JDBCFetchOptions | JDBCExecuteOptions, + options: JDBCFetchOptions | JDBCExecuteOptions, ) -> DataFrame: return self._execute_on_driver( statement=query, @@ -315,7 +317,7 @@ def _query_on_driver( def _query_optional_on_driver( self, query: str, - options: JDBCMixinOptions | JDBCFetchOptions, + options: JDBCFetchOptions, ) -> DataFrame | None: return self._execute_on_driver( statement=query, @@ -328,7 +330,7 @@ def _query_optional_on_driver( def _call_on_driver( self, query: str, - options: JDBCMixinOptions | JDBCExecuteOptions, + options: JDBCExecuteOptions, ) -> DataFrame | None: return self._execute_on_driver( statement=query, @@ -340,7 +342,7 @@ def _call_on_driver( def _get_jdbc_properties( self, - options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions, + options: GenericOptions, **kwargs, ) -> dict[str, str]: """ @@ -350,7 +352,7 @@ def _get_jdbc_properties( result.update(options.dict(by_alias=True, **kwargs)) return stringify(result) - def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions): + def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions): """ Converts human-readable Options class to ``java.util.Properties``. @@ -371,7 +373,7 @@ def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExec ) return jdbc_options.asConnectionProperties() - def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions): + def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions): if not self._last_connection_and_options: # connection class can be used in multiple threads. # each Python thread creates its own thread in JVM @@ -413,7 +415,7 @@ def _execute_on_driver( statement: str, statement_type: JDBCStatementType, callback: Callable[..., T], - options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions, + options: JDBCFetchOptions | JDBCExecuteOptions, read_only: bool, ) -> T: """ @@ -435,7 +437,7 @@ def _execute_statement( self, jdbc_statement, statement: str, - options: JDBCMixinOptions | JDBCFetchOptions | JDBCExecuteOptions, + options: JDBCFetchOptions | JDBCExecuteOptions, callback: Callable[..., T], read_only: bool, ) -> T: diff --git a/onetl/connection/db_connection/postgres/connection.py b/onetl/connection/db_connection/postgres/connection.py index a79465613..132d9727f 100644 --- a/onetl/connection/db_connection/postgres/connection.py +++ b/onetl/connection/db_connection/postgres/connection.py @@ -13,7 +13,6 @@ from onetl.connection.db_connection.jdbc_mixin.options import ( JDBCExecuteOptions, JDBCFetchOptions, - JDBCOptions, ) from onetl.connection.db_connection.postgres.dialect import PostgresDialect from onetl.connection.db_connection.postgres.options import ( @@ -185,7 +184,7 @@ def instance_url(self) -> str: def _options_to_connection_properties( self, - options: JDBCOptions | JDBCFetchOptions | JDBCExecuteOptions, + options: JDBCFetchOptions | JDBCExecuteOptions, ): # noqa: WPS437 # See https://github.com/pgjdbc/pgjdbc/pull/1252 # Since 42.2.9 Postgres JDBC Driver added new option readOnlyMode=transaction From 9da92d70079ae16a31df874129d3e658008f9e7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 14:56:47 +0000 Subject: [PATCH 07/12] [DOP-16174] Prepare for release --- docs/changelog/0.11.1.rst | 13 +++++++++++++ docs/changelog/index.rst | 1 + docs/changelog/next_release/287.feature.rst | 1 - docs/changelog/next_release/288.bugfix.rst | 1 - 4 files changed, 14 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/0.11.1.rst delete mode 100644 docs/changelog/next_release/287.feature.rst delete mode 100644 docs/changelog/next_release/288.bugfix.rst diff --git a/docs/changelog/0.11.1.rst b/docs/changelog/0.11.1.rst new file mode 100644 index 000000000..3f0fd92eb --- /dev/null +++ b/docs/changelog/0.11.1.rst @@ -0,0 +1,13 @@ +0.11.1 (2024-05-27) +=================== + +Features +-------- + +- Change ``MSSQL.port`` default from ``1433`` to ``None``, allowing use of ``instanceName`` to detect port number. (:github:pull:`287`) + + +Bug Fixes +--------- + +- Remove ``fetchsize`` from ``JDBC.WriteOptions``. (:github:pull:`288`) diff --git a/docs/changelog/index.rst b/docs/changelog/index.rst index 62c93671d..4bdac9467 100644 --- a/docs/changelog/index.rst +++ b/docs/changelog/index.rst @@ -3,6 +3,7 @@ :caption: Changelog DRAFT + 0.11.1 0.11.0 0.10.2 0.10.1 diff --git a/docs/changelog/next_release/287.feature.rst b/docs/changelog/next_release/287.feature.rst deleted file mode 100644 index e2f09341f..000000000 --- a/docs/changelog/next_release/287.feature.rst +++ /dev/null @@ -1 +0,0 @@ -Change ``MSSQL.port`` default from ``1433`` to ``None``, allowing use of ``instanceName`` to detect port number. diff --git a/docs/changelog/next_release/288.bugfix.rst b/docs/changelog/next_release/288.bugfix.rst deleted file mode 100644 index 82a095a8b..000000000 --- a/docs/changelog/next_release/288.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Remove ``fetchsize`` from ``JDBC.WriteOptions``. From b30e5638ed278761e2d4966fa9b0a37033708f6e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 27 May 2024 20:55:30 +0000 Subject: [PATCH 08/12] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/codespell-project/codespell: v2.2.6 → v2.3.0](https://github.com/codespell-project/codespell/compare/v2.2.6...v2.3.0) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aa7bde988..baa40c29e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -52,7 +52,7 @@ repos: - --no-extra-eol - repo: https://github.com/codespell-project/codespell - rev: v2.2.6 + rev: v2.3.0 hooks: - id: codespell args: [-w] From d7fdd86db7f5fe17aaafb55c057d8de66caad382 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 27 May 2024 20:56:17 +0000 Subject: [PATCH 09/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/connection/db_connection/mssql/types.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connection/db_connection/mssql/types.rst b/docs/connection/db_connection/mssql/types.rst index 1052143ed..add3aea17 100644 --- a/docs/connection/db_connection/mssql/types.rst +++ b/docs/connection/db_connection/mssql/types.rst @@ -35,7 +35,7 @@ This is how MSSQL connector performs this: it will be populated by MSSQL. .. [2] - This is true only if DataFrame column is a ``StringType()``, because text value is parsed automatically to tagret column type. + This is true only if DataFrame column is a ``StringType()``, because text value is parsed automatically to target column type. But other types cannot be silently converted, like ``int -> text``. This requires explicit casting, see `DBWriter`_. From 615717052ca1ba4bf14b273ad78aff613762b693 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 May 2024 21:12:11 +0000 Subject: [PATCH 10/12] [DOP-16270] Fix pre-commit hooks errors --- docs/logging.rst | 2 +- onetl/hwm/auto_hwm.py | 2 +- setup.cfg | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/logging.rst b/docs/logging.rst index 8b7138452..a28a51cd4 100644 --- a/docs/logging.rst +++ b/docs/logging.rst @@ -3,7 +3,7 @@ Logging ======= -Logging is quite important to understant what's going on under the hood of onETL. +Logging is quite important to understand what's going on under the hood of onETL. Default logging level for Python interpreters is ``WARNING``, but most of onETL logs are in ``INFO`` level, so users usually don't see much. diff --git a/onetl/hwm/auto_hwm.py b/onetl/hwm/auto_hwm.py index 5c42f3579..5d3464332 100644 --- a/onetl/hwm/auto_hwm.py +++ b/onetl/hwm/auto_hwm.py @@ -38,6 +38,6 @@ def update(self: AutoDetectHWM, value: Any) -> AutoDetectHWM: def dict(self, **kwargs): serialized_data = super().dict(**kwargs) # as in HWM classes default value for 'value' may be any structure, - # e.g. frozendict for KeyValueHWM, there should unifed dict representation + # e.g. frozendict for KeyValueHWM, there should unified dict representation serialized_data.pop("value") return serialized_data diff --git a/setup.cfg b/setup.cfg index b72ff1b1d..d12261ed9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -440,4 +440,4 @@ show_error_codes = True disable_error_code = name-defined, misc [codespell] -ignore-words-list = INOUT, inout +ignore-words-list = INOUT, inout, thirdparty From fd6c11259436af9ee530e68d1b247f01e6e0b75e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 28 May 2024 07:22:45 +0000 Subject: [PATCH 11/12] [DOP-16270] Fix CHANGELOG --- docs/changelog/0.11.1.rst | 2 +- onetl/connection/db_connection/mssql/connection.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/changelog/0.11.1.rst b/docs/changelog/0.11.1.rst index 3f0fd92eb..92b936d29 100644 --- a/docs/changelog/0.11.1.rst +++ b/docs/changelog/0.11.1.rst @@ -1,4 +1,4 @@ -0.11.1 (2024-05-27) +0.11.1 (2024-05-28) =================== Features diff --git a/onetl/connection/db_connection/mssql/connection.py b/onetl/connection/db_connection/mssql/connection.py index 945103536..556cb4cb3 100644 --- a/onetl/connection/db_connection/mssql/connection.py +++ b/onetl/connection/db_connection/mssql/connection.py @@ -47,9 +47,13 @@ class MSSQL(JDBCConnection): host : str Host of MSSQL database. For example: ``test.mssql.domain.com`` or ``192.168.1.14`` - port : int, default: ``1433`` + port : int, default: ``None`` Port of MSSQL database + .. versionchanged:: 0.11.1 + Default value was changed from ``1433`` to ``None``, + to allow automatic port discovery with ``instanceName``. + user : str User, which have proper access to the database. For example: ``some_user`` From 90ba720206693b081a197d1d02c549691f0236da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 29 May 2024 07:38:02 +0000 Subject: [PATCH 12/12] [DOP-16270] Prepare for release --- docs/changelog/0.11.1.rst | 2 +- docs/connection/db_connection/hive/write.rst | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/changelog/0.11.1.rst b/docs/changelog/0.11.1.rst index 92b936d29..b6b1a4980 100644 --- a/docs/changelog/0.11.1.rst +++ b/docs/changelog/0.11.1.rst @@ -1,4 +1,4 @@ -0.11.1 (2024-05-28) +0.11.1 (2024-05-29) =================== Features diff --git a/docs/connection/db_connection/hive/write.rst b/docs/connection/db_connection/hive/write.rst index 0b286a83d..95e54a5a2 100644 --- a/docs/connection/db_connection/hive/write.rst +++ b/docs/connection/db_connection/hive/write.rst @@ -21,8 +21,15 @@ Examples # Use the Hive partitioning columns to group the data. Create only 20 files per Hive partition. # Also sort the data by column which most data is correlated with, reducing files size. write_df = df.repartition( - 20, "country", "business_date", "user_id" - ).sortWithinPartitions("country", "business_date", "user_id") + 20, + "country", + "business_date", + "user_id", + ).sortWithinPartitions( + "country", + "business_date", + "user_id", + ) writer = DBWriter( connection=hive,