From f2ed8706eb7d6e21bf47aaf8e2f139277ae5a6d1 Mon Sep 17 00:00:00 2001 From: Konstantin Volkov Date: Mon, 23 Sep 2024 18:35:38 +0300 Subject: [PATCH 1/2] Add support of streaming insert statement. --- clickhouse_sqlalchemy/drivers/native/base.py | 9 ++++- clickhouse_sqlalchemy/sql/__init__.py | 4 +- clickhouse_sqlalchemy/sql/dml.py | 15 +++++++ docs/features.rst | 17 ++++++++ tests/sql/test_insert.py | 41 +++++++++++++++++++- 5 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 clickhouse_sqlalchemy/sql/dml.py diff --git a/clickhouse_sqlalchemy/drivers/native/base.py b/clickhouse_sqlalchemy/drivers/native/base.py index b8cc37d6..1e455c67 100644 --- a/clickhouse_sqlalchemy/drivers/native/base.py +++ b/clickhouse_sqlalchemy/drivers/native/base.py @@ -7,6 +7,7 @@ from ..base import ( ClickHouseDialect, ClickHouseExecutionContextBase, ClickHouseSQLCompiler, ) +from ...sql.dml import Insert from sqlalchemy.engine.interfaces import ExecuteStyle from sqlalchemy import __version__ as sqlalchemy_version @@ -20,10 +21,14 @@ class ClickHouseExecutionContext(ClickHouseExecutionContextBase): def pre_exec(self): + if not self.isinsert: + return # Always do executemany on INSERT with VALUES clause. - if (self.isinsert and self.compiled.statement.select is None and - self.parameters != [{}]): + if self.compiled.statement.select is None and self.parameters != [{}]: self.execute_style = ExecuteStyle.EXECUTEMANY + if (isinstance(self.compiled.statement, Insert) and + self.compiled.statement._values_iterator): + self.parameters = self.compiled.statement._values_iterator class ClickHouseNativeSQLCompiler(ClickHouseSQLCompiler): diff --git a/clickhouse_sqlalchemy/sql/__init__.py b/clickhouse_sqlalchemy/sql/__init__.py index 7516b593..9ba0a304 100644 --- a/clickhouse_sqlalchemy/sql/__init__.py +++ b/clickhouse_sqlalchemy/sql/__init__.py @@ -1,6 +1,6 @@ from .schema import Table, MaterializedView from .selectable import Select, select +from .dml import Insert, insert - -__all__ = ('Table', 'MaterializedView', 'Select', 'select') +__all__ = ('Table', 'MaterializedView', 'Select', 'select', 'Insert', 'insert') diff --git a/clickhouse_sqlalchemy/sql/dml.py b/clickhouse_sqlalchemy/sql/dml.py new file mode 100644 index 00000000..b633af84 --- /dev/null +++ b/clickhouse_sqlalchemy/sql/dml.py @@ -0,0 +1,15 @@ +from sqlalchemy.sql.dml import Insert as BaseInsert + +__all__ = ('Insert', 'insert') + + +class Insert(BaseInsert): + _values_iterator: None + + def values_iterator(self, columns, iterator): + self._values_iterator = iterator + self._multi_values = ([{column: None for column in columns}],) + return self + + +insert = Insert diff --git a/docs/features.rst b/docs/features.rst index 20dce7e8..d98103af 100644 --- a/docs/features.rst +++ b/docs/features.rst @@ -694,6 +694,23 @@ INSERT FROM SELECT statement: .from_select(['day', 'value'], select_query) ) +Streaming insert: + + .. code-block:: python + from datetime import datetime + from clickhouse_sqlalchemy import sql + + def generator(): + for i in range(100): + yield [datetime.now(), 1, i] + + session.execute( + sql.insert(Statistics).values_iterator( + [Statistics.date, Statistics.sign, Statistics.grouping], + generator() + ) + ) + UPDATE and DELETE ----------------- diff --git a/tests/sql/test_insert.py b/tests/sql/test_insert.py index 26d2fdc1..c3513f28 100644 --- a/tests/sql/test_insert.py +++ b/tests/sql/test_insert.py @@ -1,6 +1,6 @@ from sqlalchemy import Column, literal_column, select -from clickhouse_sqlalchemy import types, Table, engines +from clickhouse_sqlalchemy import types, Table, engines, sql from tests.testcase import NativeSessionTestCase from tests.util import require_server_version @@ -39,3 +39,42 @@ def test_insert_map(self): rv = self.session.execute(select(table.c.x)).scalar() self.assertEqual(rv, dict_map) + + @require_server_version(19, 3, 3) + def test_insert_iterator(self): + table = Table( + 't', self.metadata(), + Column('x', types.String, primary_key=True), + engines.Log() + ) + + def generator(): + yield ["foo"] + yield ["bar"] + + with self.create_table(table): + query = sql.insert(table).values_iterator([table.c.x], generator()) + self.session.execute(query) + + result = list(self.session.execute(select(table.c.x))) + print(result) + self.assertListEqual(result, [('foo',), ('bar',)]) + + @require_server_version(19, 3, 3) + def test_insert_iterator_list(self): + table = Table( + 't', self.metadata(), + Column('x', types.String, primary_key=True), + engines.Log() + ) + + with self.create_table(table): + query = sql.insert(table).values_iterator( + [table.c.x], + [["foo"], ["bar"]] + ) + self.session.execute(query) + + result = list(self.session.execute(select(table.c.x))) + print(result) + self.assertListEqual(result, [('foo',), ('bar',)]) From ea22e83d56f746c7a5d806aef6d75d3d9bd72439 Mon Sep 17 00:00:00 2001 From: Konstantin Volkov Date: Mon, 23 Sep 2024 18:43:18 +0300 Subject: [PATCH 2/2] Add support of streaming insert statement. --- tests/sql/test_insert.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/sql/test_insert.py b/tests/sql/test_insert.py index c3513f28..65bd96f2 100644 --- a/tests/sql/test_insert.py +++ b/tests/sql/test_insert.py @@ -57,7 +57,6 @@ def generator(): self.session.execute(query) result = list(self.session.execute(select(table.c.x))) - print(result) self.assertListEqual(result, [('foo',), ('bar',)]) @require_server_version(19, 3, 3) @@ -76,5 +75,4 @@ def test_insert_iterator_list(self): self.session.execute(query) result = list(self.session.execute(select(table.c.x))) - print(result) self.assertListEqual(result, [('foo',), ('bar',)])