diff --git a/pytd/writer.py b/pytd/writer.py index d490eb4..a53508a 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -452,19 +452,29 @@ def write_dataframe( _replace_pd_na(dataframe) num_rows = len(dataframe) # chunk number of records should not exceed 200 to avoid OSError - _chunk_record_size = max(chunk_record_size, num_rows//200) + _chunk_record_size = max(chunk_record_size, num_rows // 200) try: - for start in range(0, num_rows, _chunk_record_size): - records = dataframe.iloc[ - start : start + _chunk_record_size - ].to_dict(orient="records") - fp = tempfile.NamedTemporaryFile( - suffix=".msgpack.gz", delete=False - ) - fp = self._write_msgpack_stream(records, fp) - fps.append(fp) - stack.callback(os.unlink, fp.name) - stack.callback(fp.close) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for start in range(0, num_rows, _chunk_record_size): + records = dataframe.iloc[ + start : start + _chunk_record_size + ].to_dict(orient="records") + fp = tempfile.NamedTemporaryFile( + suffix=".msgpack.gz", delete=False + ) + futures.append( + ( + start, + executor.submit( + self._write_msgpack_stream, records, fp + ), + ) + ) + stack.callback(os.unlink, fp.name) + stack.callback(fp.close) + for start, future in sorted(futures): + fps.append(future.result()) except OSError as e: raise RuntimeError( "failed to create a temporary file. "