Skip to content

Commit

Permalink
Convert dataframe to dict by chunk to reduce memory consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
chezou committed Aug 27, 2024
1 parent 80ed76b commit bc69372
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
12 changes: 6 additions & 6 deletions pytd/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ def test_write_dataframe_msgpack_with_int_na(self):
],
dtype="Int64",
)
expected_list = (
expected_list = [
{"a": 1, "b": 2, "c": None, "time": 1234},
{"a": 3, "b": 4, "c": 5, "time": 1234},
)
]
self.writer._write_msgpack_stream = MagicMock()
with patch("pytd.writer.os.unlink"):
self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack")
Expand All @@ -315,10 +315,10 @@ def test_write_dataframe_msgpack_with_string_na(self):
dtype="string",
)
df["time"] = 1234
expected_list = (
expected_list = [
{"a": "foo", "b": "bar", "c": None, "time": 1234},
{"a": "buzz", "b": "buzz", "c": "alice", "time": 1234},
)
]
self.writer._write_msgpack_stream = MagicMock()
with patch("pytd.writer.os.unlink"):
self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack")
Expand All @@ -334,10 +334,10 @@ def test_write_dataframe_msgpack_with_boolean_na(self):
dtype="boolean",
)
df["time"] = 1234
expected_list = (
expected_list = [
{"a": "true", "b": "false", "c": None, "time": 1234},
{"a": "false", "b": "true", "c": "true", "time": 1234},
)
]
self.writer._write_msgpack_stream = MagicMock()
with patch("pytd.writer.os.unlink"):
self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack")
Expand Down
14 changes: 9 additions & 5 deletions pytd/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,15 @@ def write_dataframe(
elif fmt == "msgpack":
_replace_pd_na(dataframe)

records = dataframe.to_dict(orient="records")
try:
for group in zip_longest(*(iter(records),) * chunk_record_size):
fp = tempfile.NamedTemporaryFile(suffix=".msgpack.gz", delete=False)
fp = self._write_msgpack_stream(group, fp)
for start in range(0, len(dataframe), chunk_record_size):
records = dataframe.iloc[
start : start + chunk_record_size
].to_dict(orient="records")
fp = tempfile.NamedTemporaryFile(
suffix=".msgpack.gz", delete=False
)
fp = self._write_msgpack_stream(records, fp)
fps.append(fp)
stack.callback(os.unlink, fp.name)
stack.callback(fp.close)
Expand Down Expand Up @@ -543,7 +547,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5):
bulk_import.delete()
raise RuntimeError(f"failed to upload file: {e}")

logger.info(f"uploaded data in {time.time() - s_time:.2f} sec")
logger.debug(f"uploaded data in {time.time() - s_time:.2f} sec")

logger.info("performing a bulk import job")
job = bulk_import.perform(wait=True)
Expand Down

0 comments on commit bc69372

Please sign in to comment.