Skip to content

Reduce memory usage in delta format code paths #723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
Table(name = "deletion_vectors_with_dvs_dv_property_on", schema = "default", share = "share8"),
Table(name = "dv_and_cm_table", schema = "default", share = "share8"),
Table(name = "timestampntz_cdf_table", schema = "default", share = "share8")
Table(name = "12k_rows", schema = "default", share = "share8")
)
assert(expected == client.listAllTables().toSet)
} finally {
Expand Down
6 changes: 4 additions & 2 deletions python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def load_as_pandas(
metadata will be used to determine whether to use delta or parquet format.
:param convert_in_batches: Whether to convert the parquet files to pandas one batch at a time
rather than one file at a time. This may reduce memory consumption at the cost of taking
longer and downloading more data.
longer or downloading more data, with parquet format queries being more likely to see
improvements.
:return: A pandas DataFrame representing the shared table.
"""
profile_json, share, schema, table = _parse_url(url)
Expand Down Expand Up @@ -248,7 +249,8 @@ def load_table_changes_as_pandas(
format.
:param convert_in_batches: Whether to convert the parquet files to pandas one batch at a time
rather than one file at a time. This may reduce memory consumption at the cost of taking
longer and downloading more data.
longer or downloading more data, with parquet format queries being more likely to see
improvements.
:return: A pandas DataFrame representing the shared table.
"""
profile_json, share, schema, table = _parse_url(url)
Expand Down
17 changes: 12 additions & 5 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,12 @@ def __to_pandas_kernel(self):
schema = scan.execute(interface).schema
return pd.DataFrame(columns=schema.names)

table = pa.Table.from_batches(scan.execute(interface))
result = table.to_pandas()
batches = scan.execute(interface)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there a parameter controlling the batch behavior?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

if self._convert_in_batches:
pdfs = [batch.to_pandas(self_destruct=True) for batch in batches]
result = pd.concat(pdfs, axis=0, ignore_index=True, copy=False)
else:
result = pa.Table.from_batches(batches).to_pandas(self_destruct=True)

# Apply residual limit that was not handled from server pushdown
result = result.head(self._limit)
Expand Down Expand Up @@ -368,12 +372,15 @@ def __table_changes_to_pandas_kernel(self, cdfOptions: CdfOptions) -> pd.DataFra
table, interface, min_version, max_version
).build()

scan_result = scan.execute(interface)
if num_versions_with_action == 0:
schema = scan.execute(interface).schema
schema = scan_result.schema
result = pd.DataFrame(columns=schema.names)
elif self._convert_in_batches:
pdfs = [batch.to_pandas(self_destruct=True) for batch in scan_result]
result = pd.concat(pdfs, axis=0, ignore_index=True, copy=False)
else:
table = pa.Table.from_batches(scan.execute(interface))
result = table.to_pandas()
result = pa.Table.from_batches(scan_result).to_pandas(self_destruct=True)
finally:
# Delete the temp folder explicitly and remove the delta format from header
temp_dir.cleanup()
Expand Down
108 changes: 108 additions & 0 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _verify_all_tables_result(tables: Sequence[Table]):
Table(name="table_wasb", share="share_azure", schema="default"),
Table(name="table_abfs", share="share_azure", schema="default"),
Table(name="table_gcs", share="share_gcp", schema="default"),
Table(name="12k_rows", share="share8", schema="default"),
Table(name="timestampntz_cdf_table", share="share8", schema="default"),
Table(name="cdf_table_cdf_enabled", share="share8", schema="default"),
Table(name="cdf_table_with_partition", share="share8", schema="default"),
Expand Down Expand Up @@ -1687,3 +1688,110 @@ def test_load_table_changes_as_spark(
match="Unable to import pyspark. `load_table_changes_as_spark` requires" + " PySpark.",
):
load_table_changes_as_spark("not-used")


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments",
[pytest.param("share8.default.12k_rows")],
)
def test_load_as_pandas_delta_batch_convert(
profile_path: str,
fragments: str,
):
ids = list(range(12000))
expected = pd.DataFrame(
{
"id": ids,
"name": [f"str_{n}" for n in ids],
"time": [pd.Timestamp(n * 10**5, unit="s", tz="UTC") for n in ids],
"val": [n**0.5 for n in ids],
}
)
expected["time"] = expected["time"].astype("datetime64[us, UTC]")

pdf = (
load_as_pandas(
f"{profile_path}#{fragments}", use_delta_format=True, convert_in_batches=True
)
.sort_values(by="id")
.reset_index(drop=True)
)
pd.testing.assert_frame_equal(pdf, expected)
pdf = load_as_pandas(
f"{profile_path}#{fragments}", use_delta_format=True, convert_in_batches=True, limit=500
)
assert len(pdf) == 500
pdf = load_as_pandas(
f"{profile_path}#{fragments}", use_delta_format=True, convert_in_batches=True, limit=3000
)
assert len(pdf) == 3000


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments",
[pytest.param("share8.default.12k_rows")],
)
def test_load_table_changes_as_pandas_delta_batch_convert(
profile_path: str,
fragments: str,
):
rows_to_insert = list(range(100, 1600, 100)) # adds up to 12000

version_to_timestamp = {
1: pd.Timestamp("2025-05-29T02:51:51.000+00:00"),
2: pd.Timestamp("2025-05-29T02:51:53.000+00:00"),
3: pd.Timestamp("2025-05-29T02:51:54.000+00:00"),
4: pd.Timestamp("2025-05-29T02:51:56.000+00:00"),
5: pd.Timestamp("2025-05-29T02:51:57.000+00:00"),
6: pd.Timestamp("2025-05-29T02:51:58.000+00:00"),
7: pd.Timestamp("2025-05-29T02:52:00.000+00:00"),
8: pd.Timestamp("2025-05-29T02:52:01.000+00:00"),
9: pd.Timestamp("2025-05-29T02:52:03.000+00:00"),
10: pd.Timestamp("2025-05-29T02:52:04.000+00:00"),
11: pd.Timestamp("2025-05-29T02:52:05.000+00:00"),
12: pd.Timestamp("2025-05-29T02:52:07.000+00:00"),
13: pd.Timestamp("2025-05-29T02:52:09.000+00:00"),
14: pd.Timestamp("2025-05-29T02:52:11.000+00:00"),
15: pd.Timestamp("2025-05-29T02:52:12.000+00:00"),
}

rows_inserted = 0
version = 1
expected_pdfs = []
for row_count in rows_to_insert:
ids = list(range(rows_inserted, rows_inserted + row_count))
pdf_added = pd.DataFrame(
{
"id": ids,
"name": [f"str_{n}" for n in ids],
"time": [pd.Timestamp(n * 10**5, unit="s", tz="UTC") for n in ids],
"val": [n**0.5 for n in ids],
"_change_type": "insert",
"_commit_version": version,
"_commit_timestamp": version_to_timestamp[version],
}
)
expected_pdfs.append(pdf_added)
rows_inserted += row_count
version += 1

expected = (
pd.concat(expected_pdfs, axis=0)
.sort_values(by=["_commit_timestamp", "id"])
.reset_index(drop=True)
)
expected["_commit_timestamp"] = expected["_commit_timestamp"].astype("datetime64[us, UTC]")
expected["time"] = expected["time"].astype("datetime64[us, UTC]")
pdf = (
load_table_changes_as_pandas(
f"{profile_path}#{fragments}",
starting_version=0,
use_delta_format=True,
convert_in_batches=True,
)
.sort_values(by=["_commit_timestamp", "id"])
.reset_index(drop=True)
)
pd.testing.assert_frame_equal(pdf, expected)
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ object TestResource {
SchemaConfig(
"default",
java.util.Arrays.asList(
TableConfig(
"12k_rows",
s"s3a://${AWS.bucket}/delta-exchange-test/12k_rows",
"00000000-0000-0000-0000-000000000096",
historyShared = true
),
TableConfig(
"timestampntz_cdf_table",
s"s3a://${AWS.bucket}/delta-exchange-test/timestampntz_cdf_table",
Expand Down
Loading