-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add parallel upload for BulkImportWriter #134
Conversation
8714d12
to
175e3d8
Compare
This change is to avoid the need to keep the entire msgpack in memory, which can be a problem for large data sets.
b8c6758
to
80ed76b
Compare
bc69372
to
bfe2233
Compare
pytd/writer.py
Outdated
@@ -449,7 +482,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv"): | |||
table : :class:`pytd.table.Table` | |||
Target table. | |||
|
|||
file_like : File like object | |||
file_like : List of file like objects |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_like : List of file like objects | |
file_likes : List of file like objects |
pytd/writer.py
Outdated
stack.close() | ||
|
||
def _bulk_import(self, table, file_like, if_exists, fmt="csv"): | ||
def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): | |
def _bulk_import(self, table, file_likes, if_exists, fmt="csv", max_workers=5): |
pytd/writer.py
Outdated
# To skip API._prepare_file(), which recreate msgpack again. | ||
bulk_import.upload_part("part", file_like, size) | ||
with ThreadPoolExecutor(max_workers=max_workers) as executor: | ||
for i, fp in enumerate(file_like): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i, fp in enumerate(file_like): | |
for i, fp in enumerate(file_likes): |
pytd/writer.py
Outdated
else: | ||
bulk_import.upload_file("part", fmt, file_like) | ||
fp = file_like[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fp = file_like[0] | |
fp = file_likes[0] |
@@ -542,7 +591,9 @@ def _write_msgpack_stream(self, items, stream): | |||
mp = packer.pack(normalized_msgpack(item)) | |||
gz.write(mp) | |||
|
|||
stream.seek(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did this #seek
become unnecessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's moved to here https://github.com/treasure-data/pytd/pull/134/files#diff-7e3490636bfbd0197f47ef9694662f2621beb294fb178b5185c8f257082c53e7R533
This is because to get file size bystream.tell()
properly, #seek
should happen later.
df.to_dict(orient="records"), fp | ||
) | ||
api_client.create_bulk_import().upload_part.assert_called_with( | ||
"part-0", ANY, 62 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I missed something, but where this 62
comes from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to get the file size from fp
, but I gave up because it doesn't match with the actual file size in the _write_msgpack_stream()
method.
Co-authored-by: Roman Shtykh <[email protected]>
This patch introduces parallel upload capability to BulkImportWriter.
It makes a chunk of 10,000 records and uploads in parallel. Also, it reduces memory consumption for msgpack format uploading.
Upload speed becomes x4.4 faster, and memory consumption can be controlled by using Temporary file and
chunk_record_size
number.Dummy data creation for benchmark
Upload with a single thread
Upload with 6 threads