Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Dec 3, 2024
2 parents 694a71a + 8d1e395 commit 57754e4
Show file tree
Hide file tree
Showing 20 changed files with 1,559 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ repos:
- black==24.4.2

- repo: https://github.com/pycqa/bandit
rev: 1.7.10
rev: 1.8.0
hooks:
- id: bandit
args:
Expand Down
15 changes: 15 additions & 0 deletions docs/changelog/0.12.5.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
0.12.5 (2024-12-03)
===================

Improvements
------------

- Use ``sipHash64`` instead of ``md5`` in Clickhouse for reading data with ``{"partitioning_mode": "hash"}``, as it is 5 times faster.
- Use ``hashtext`` instead of ``md5`` in Postgres for reading data with ``{"partitioning_mode": "hash"}``, as it is 3-5 times faster.
- Use ``BINARY_CHECKSUM`` instead of ``HASHBYTES`` in MSSQL for reading data with ``{"partitioning_mode": "hash"}``, as it is 5 times faster.

Big fixes
---------

- In JDBC sources wrap ``MOD(partitionColumn, numPartitions)`` with ``ABS(...)`` to make al returned values positive. This prevents data sked.
- Fix reading table data from MSSQL using ``{"partitioning_mode": "hash"}`` with ``partitionColumn`` of integer type.
1 change: 1 addition & 0 deletions docs/changelog/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
:caption: Changelog

DRAFT
0.12.5
0.12.4
0.12.3
0.12.2
Expand Down
2 changes: 1 addition & 1 deletion onetl/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.12.4
0.12.5
9 changes: 7 additions & 2 deletions onetl/connection/db_connection/clickhouse/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@

class ClickhouseDialect(JDBCDialect):
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
return f"halfMD5({partition_column}) % {num_partitions}"
# SipHash is 3 times faster thah MD5
# https://clickhouse.com/docs/en/sql-reference/functions/hash-functions#siphash64
return f"sipHash64({partition_column}) % {num_partitions}"

def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"{partition_column} % {num_partitions}"
# Return positive value even for negative input.
# Don't use positiveModulo as it is 4-5 times slower:
# https://clickhouse.com/docs/en/sql-reference/functions/arithmetic-functions#positivemoduloa-b
return f"abs({partition_column} % {num_partitions})"

def get_max_value(self, value: Any) -> str:
# Max function in Clickhouse returns 0 instead of NULL for empty table
Expand Down
15 changes: 11 additions & 4 deletions onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,10 @@ class Config:
.. note::
Column type depends on :obj:`~partitioning_mode`.
* ``partitioning_mode="range"`` requires column to be an integer or date (can be NULL, but not recommended).
* ``partitioning_mode="hash"`` requires column to be an string (NOT NULL).
* ``partitioning_mode="range"`` requires column to be an integer, date or timestamp (can be NULL, but not recommended).
* ``partitioning_mode="hash"`` accepts any column type (NOT NULL).
* ``partitioning_mode="mod"`` requires column to be an integer (NOT NULL).
See documentation for :obj:`~partitioning_mode` for more details"""

num_partitions: PositiveInt = Field(default=1, alias="numPartitions")
Expand Down Expand Up @@ -256,6 +255,10 @@ class Config:
Where ``stride=(upper_bound - lower_bound) / num_partitions``.
.. note::
Can be used only with columns of integer, date or timestamp types.
.. note::
:obj:`~lower_bound`, :obj:`~upper_bound` and :obj:`~num_partitions` are used just to
Expand Down Expand Up @@ -297,7 +300,7 @@ class Config:
.. note::
The hash function implementation depends on RDBMS. It can be ``MD5`` or any other fast hash function,
or expression based on this function call.
or expression based on this function call. Usually such functions accepts any column type as an input.
* ``mod``
Allocate each executor a set of values based on modulus of the :obj:`~partition_column` column.
Expand Down Expand Up @@ -325,6 +328,10 @@ class Config:
SELECT ... FROM table
WHERE (partition_column mod num_partitions) = num_partitions-1 -- upper_bound
.. note::
Can be used only with columns of integer type.
.. versionadded:: 0.5.0
Examples
Expand Down
9 changes: 6 additions & 3 deletions onetl/connection/db_connection/mssql/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@


class MSSQLDialect(JDBCDialect):
# https://docs.microsoft.com/ru-ru/sql/t-sql/functions/hashbytes-transact-sql?view=sql-server-ver16
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
return f"CONVERT(BIGINT, HASHBYTES ('SHA', {partition_column})) % {num_partitions}"
# CHECKSUM/BINARY_CHECKSUM are faster than MD5 in 5 times:
# https://stackoverflow.com/a/4691861/23601543
# https://learn.microsoft.com/en-us/sql/t-sql/functions/checksum-transact-sql?view=sql-server-ver16
return f"ABS(BINARY_CHECKSUM({partition_column})) % {num_partitions}"

def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"{partition_column} % {num_partitions}"
# Return positive value even for negative input
return f"ABS({partition_column} % {num_partitions})"

def get_sql_query(
self,
Expand Down
7 changes: 5 additions & 2 deletions onetl/connection/db_connection/mysql/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@

class MySQLDialect(JDBCDialect):
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
return f"MOD(CONV(CONV(RIGHT(MD5({partition_column}), 16), 16, 2), 2, 10), {num_partitions})"
# MD5 is the fastest hash function https://stackoverflow.com/a/3118889/23601543
# But it returns 32 char string (128 bit), which we need to convert to integer
return f"CAST(CONV(RIGHT(MD5({partition_column}), 16), 16, 10) AS UNSIGNED) % {num_partitions}"

def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"MOD({partition_column}, {num_partitions})"
# Return positive value even for negative input
return f"ABS({partition_column} % {num_partitions})"

def escape_column(self, value: str) -> str:
return f"`{value}`"
Expand Down
3 changes: 2 additions & 1 deletion onetl/connection/db_connection/oracle/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int)
return f"ora_hash({partition_column}, {num_partitions - 1})"

def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"MOD({partition_column}, {num_partitions})"
# Return positive value even for negative input
return f"ABS(MOD({partition_column}, {num_partitions}))"

def _serialize_datetime(self, value: datetime) -> str:
result = value.strftime("%Y-%m-%d %H:%M:%S")
Expand Down
8 changes: 5 additions & 3 deletions onetl/connection/db_connection/postgres/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@


class PostgresDialect(NotSupportHint, JDBCDialect):
# https://stackoverflow.com/a/9812029
def get_partition_column_hash(self, partition_column: str, num_partitions: int) -> str:
return f"('x'||right(md5('{partition_column}'), 16))::bit(32)::bigint % {num_partitions}"
# hashtext is about 3-5 times faster than MD5 (tested locally)
# https://postgrespro.com/list/thread-id/1506406
return f"abs(hashtext({partition_column}::text)) % {num_partitions}"

def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"{partition_column} % {num_partitions}"
# Return positive value even for negative input
return f"abs({partition_column} % {num_partitions})"

def _serialize_datetime(self, value: datetime) -> str:
result = value.isoformat()
Expand Down
3 changes: 2 additions & 1 deletion onetl/connection/db_connection/teradata/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def get_partition_column_hash(self, partition_column: str, num_partitions: int)
return f"HASHAMP(HASHBUCKET(HASHROW({partition_column}))) mod {num_partitions}"

def get_partition_column_mod(self, partition_column: str, num_partitions: int) -> str:
return f"{partition_column} mod {num_partitions}"
# Return positive value even for negative input
return f"ABS({partition_column} mod {num_partitions})"

def _serialize_datetime(self, value: datetime) -> str:
result = value.isoformat()
Expand Down
3 changes: 2 additions & 1 deletion tests/fixtures/processing/base_processing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import secrets
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import date, datetime, timedelta
Expand Down Expand Up @@ -137,7 +138,7 @@ def create_pandas_df(
elif "float" in column_name:
values[column].append(float(f"{i}.{i}"))
elif "text" in column_name:
values[column].append("This line is made to test the work")
values[column].append(secrets.token_hex(16))
elif "datetime" in column_name:
rand_second = randint(0, i * time_multiplier) # noqa: S311
values[column].append(self.current_datetime() + timedelta(seconds=rand_second))
Expand Down
3 changes: 2 additions & 1 deletion tests/fixtures/processing/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import secrets
from collections import defaultdict
from datetime import date, datetime, timedelta
from logging import getLogger
Expand Down Expand Up @@ -74,7 +75,7 @@ def create_pandas_df(self, min_id: int = 1, max_id: int | None = None) -> pandas
elif "float" in column_name:
values[column_name].append(float(f"{i}.{i}"))
elif "text" in column_name:
values[column_name].append("This line is made to test the work")
values[column_name].append(secrets.token_hex(16))
elif "datetime" in column_name:
rand_second = randint(0, i * time_multiplier) # noqa: S311
# Clickhouse DATETIME format has time range: 00:00:00 through 23:59:59
Expand Down
3 changes: 2 additions & 1 deletion tests/fixtures/processing/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import secrets
from collections import defaultdict
from datetime import datetime, timedelta
from logging import getLogger
Expand Down Expand Up @@ -149,7 +150,7 @@ def create_pandas_df(self, min_id: int = 1, max_id: int | None = None) -> pandas
elif "float" in column_name:
values[column_name].append(float(f"{i}.{i}"))
elif "text" in column_name:
values[column_name].append("This line is made to test the work")
values[column_name].append(secrets.token_hex(16))
elif "datetime" in column_name:
rand_second = randint(0, i * time_multiplier) # noqa: S311
now = self.current_datetime() + timedelta(seconds=rand_second)
Expand Down
Loading

0 comments on commit 57754e4

Please sign in to comment.