Skip to content

Commit

Permalink
Enable Multithreading on msgpack Chunking in BulkImportWriter (#142)
Browse files Browse the repository at this point in the history
* Enable multithreading on msgpack chunking

* remove redundant parameter from different PR

* Slight refactor
  • Loading branch information
DavidLandup0 authored Dec 6, 2024
1 parent 6490412 commit 7357c4e
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions pytd/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,19 +452,29 @@ def write_dataframe(
_replace_pd_na(dataframe)
num_rows = len(dataframe)
# chunk number of records should not exceed 200 to avoid OSError
_chunk_record_size = max(chunk_record_size, num_rows//200)
_chunk_record_size = max(chunk_record_size, num_rows // 200)
try:
for start in range(0, num_rows, _chunk_record_size):
records = dataframe.iloc[
start : start + _chunk_record_size
].to_dict(orient="records")
fp = tempfile.NamedTemporaryFile(
suffix=".msgpack.gz", delete=False
)
fp = self._write_msgpack_stream(records, fp)
fps.append(fp)
stack.callback(os.unlink, fp.name)
stack.callback(fp.close)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for start in range(0, num_rows, _chunk_record_size):
records = dataframe.iloc[
start : start + _chunk_record_size
].to_dict(orient="records")
fp = tempfile.NamedTemporaryFile(
suffix=".msgpack.gz", delete=False
)
futures.append(
(
start,
executor.submit(
self._write_msgpack_stream, records, fp
),
)
)
stack.callback(os.unlink, fp.name)
stack.callback(fp.close)
for start, future in sorted(futures):
fps.append(future.result())
except OSError as e:
raise RuntimeError(
"failed to create a temporary file. "
Expand Down

0 comments on commit 7357c4e

Please sign in to comment.