Skip to content

Commit

Permalink
Add chunk_record_size argument
Browse files Browse the repository at this point in the history
  • Loading branch information
chezou committed Aug 17, 2024
1 parent ef3887a commit a0767f3
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions pytd/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,14 @@ class BulkImportWriter(Writer):
"""

def write_dataframe(
self, dataframe, table, if_exists, fmt="csv", keep_list=False, max_workers=5
self,
dataframe,
table,
if_exists,
fmt="csv",
keep_list=False,
max_workers=5,
chunk_record_size=10_000,
):
"""Write a given DataFrame to a Treasure Data table.
Expand Down Expand Up @@ -407,10 +414,14 @@ def write_dataframe(
Or, you can use :func:`Client.load_table_from_dataframe` function as well.
>>> client.load_table_from_dataframe(df, "bulk_import", keep_list=True)
max_workers : int, optional, default: 5
The maximum number of threads that can be used to execute the given calls.
This is used only when ``fmt`` is ``msgpack``.
chunk_record_size : int, optional, default: 10_000
The number of records to be written in a single file. This is used only when
``fmt`` is ``msgpack``.
"""
if self.closed:
raise RuntimeError("this writer is already closed and no longer available")
Expand Down Expand Up @@ -439,7 +450,7 @@ def write_dataframe(
_replace_pd_na(dataframe)

records = dataframe.to_dict(orient="records")
for group in zip_longest(*(iter(records),) * 10000):
for group in zip_longest(*(iter(records),) * chunk_record_size):
fp = io.BytesIO()
fp = self._write_msgpack_stream(group, fp)
fps.append(fp)
Expand Down

0 comments on commit a0767f3

Please sign in to comment.