Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of streaming insert statement. #338

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions clickhouse_sqlalchemy/drivers/native/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ..base import (
ClickHouseDialect, ClickHouseExecutionContextBase, ClickHouseSQLCompiler,
)
from ...sql.dml import Insert
from sqlalchemy.engine.interfaces import ExecuteStyle
from sqlalchemy import __version__ as sqlalchemy_version

Expand All @@ -20,10 +21,14 @@

class ClickHouseExecutionContext(ClickHouseExecutionContextBase):
def pre_exec(self):
if not self.isinsert:
return
# Always do executemany on INSERT with VALUES clause.
if (self.isinsert and self.compiled.statement.select is None and
self.parameters != [{}]):
if self.compiled.statement.select is None and self.parameters != [{}]:
self.execute_style = ExecuteStyle.EXECUTEMANY
if (isinstance(self.compiled.statement, Insert) and
self.compiled.statement._values_iterator):
self.parameters = self.compiled.statement._values_iterator


class ClickHouseNativeSQLCompiler(ClickHouseSQLCompiler):
Expand Down
4 changes: 2 additions & 2 deletions clickhouse_sqlalchemy/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

from .schema import Table, MaterializedView
from .selectable import Select, select
from .dml import Insert, insert


__all__ = ('Table', 'MaterializedView', 'Select', 'select')
__all__ = ('Table', 'MaterializedView', 'Select', 'select', 'Insert', 'insert')
15 changes: 15 additions & 0 deletions clickhouse_sqlalchemy/sql/dml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from sqlalchemy.sql.dml import Insert as BaseInsert

__all__ = ('Insert', 'insert')


class Insert(BaseInsert):
_values_iterator: None

def values_iterator(self, columns, iterator):
self._values_iterator = iterator
self._multi_values = ([{column: None for column in columns}],)
return self


insert = Insert
17 changes: 17 additions & 0 deletions docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,23 @@ INSERT FROM SELECT statement:
.from_select(['day', 'value'], select_query)
)

Streaming insert:

.. code-block:: python
from datetime import datetime
from clickhouse_sqlalchemy import sql

def generator():
for i in range(100):
yield [datetime.now(), 1, i]

session.execute(
sql.insert(Statistics).values_iterator(
[Statistics.date, Statistics.sign, Statistics.grouping],
generator()
)
)

UPDATE and DELETE
-----------------

Expand Down
39 changes: 38 additions & 1 deletion tests/sql/test_insert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sqlalchemy import Column, literal_column, select

from clickhouse_sqlalchemy import types, Table, engines
from clickhouse_sqlalchemy import types, Table, engines, sql
from tests.testcase import NativeSessionTestCase
from tests.util import require_server_version

Expand Down Expand Up @@ -39,3 +39,40 @@ def test_insert_map(self):

rv = self.session.execute(select(table.c.x)).scalar()
self.assertEqual(rv, dict_map)

@require_server_version(19, 3, 3)
def test_insert_iterator(self):
table = Table(
't', self.metadata(),
Column('x', types.String, primary_key=True),
engines.Log()
)

def generator():
yield ["foo"]
yield ["bar"]

with self.create_table(table):
query = sql.insert(table).values_iterator([table.c.x], generator())
self.session.execute(query)

result = list(self.session.execute(select(table.c.x)))
self.assertListEqual(result, [('foo',), ('bar',)])

@require_server_version(19, 3, 3)
def test_insert_iterator_list(self):
table = Table(
't', self.metadata(),
Column('x', types.String, primary_key=True),
engines.Log()
)

with self.create_table(table):
query = sql.insert(table).values_iterator(
[table.c.x],
[["foo"], ["bar"]]
)
self.session.execute(query)

result = list(self.session.execute(select(table.c.x)))
self.assertListEqual(result, [('foo',), ('bar',)])