diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 9bc8d8044..814499b87 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -182,7 +182,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { Table(name = "table_with_cm_id", schema = "default", share = "share8"), Table(name = "deletion_vectors_with_dvs_dv_property_on", schema = "default", share = "share8"), Table(name = "dv_and_cm_table", schema = "default", share = "share8"), - Table(name = "timestampntz_cdf_table", schema = "default", share = "share8") + Table(name = "timestampntz_cdf_table", schema = "default", share = "share8"), + Table(name = "12k_rows", schema = "default", share = "share8") ) assert(expected == client.listAllTables().toSet) } finally { diff --git a/python/delta_sharing/delta_sharing.py b/python/delta_sharing/delta_sharing.py index 26cf8f7fe..966fa8fe5 100644 --- a/python/delta_sharing/delta_sharing.py +++ b/python/delta_sharing/delta_sharing.py @@ -129,7 +129,8 @@ def load_as_pandas( metadata will be used to determine whether to use delta or parquet format. :param convert_in_batches: Whether to convert the parquet files to pandas one batch at a time rather than one file at a time. This may reduce memory consumption at the cost of taking - longer and downloading more data. + longer or downloading more data, with parquet format queries being more likely to see + improvements. :return: A pandas DataFrame representing the shared table. """ profile_json, share, schema, table = _parse_url(url) @@ -248,7 +249,8 @@ def load_table_changes_as_pandas( format. :param convert_in_batches: Whether to convert the parquet files to pandas one batch at a time rather than one file at a time. This may reduce memory consumption at the cost of taking - longer and downloading more data. + longer or downloading more data, with parquet format queries being more likely to see + improvements. :return: A pandas DataFrame representing the shared table. """ profile_json, share, schema, table = _parse_url(url) diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index 57438a4f5..d021ddeb2 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -133,8 +133,13 @@ def __to_pandas_kernel(self): schema = scan.execute(interface).schema return pd.DataFrame(columns=schema.names) - table = pa.Table.from_batches(scan.execute(interface)) - result = table.to_pandas() + batches = scan.execute(interface) + if self._convert_in_batches: + pdfs = [batch.to_pandas(self_destruct=True) for batch in batches] + print(f"Received {len(pdfs)} batches of data.") + result = pd.concat(pdfs, axis=0, ignore_index=True, copy=False) + else: + result = pa.Table.from_batches(batches).to_pandas(self_destruct=True) # Apply residual limit that was not handled from server pushdown result = result.head(self._limit) @@ -368,12 +373,15 @@ def __table_changes_to_pandas_kernel(self, cdfOptions: CdfOptions) -> pd.DataFra table, interface, min_version, max_version ).build() + scan_result = scan.execute(interface) if num_versions_with_action == 0: - schema = scan.execute(interface).schema + schema = scan_result.schema result = pd.DataFrame(columns=schema.names) + elif self._convert_in_batches: + pdfs = [batch.to_pandas(self_destruct=True) for batch in scan_result] + result = pd.concat(pdfs, axis=0, ignore_index=True, copy=False) else: - table = pa.Table.from_batches(scan.execute(interface)) - result = table.to_pandas() + result = pa.Table.from_batches(scan_result).to_pandas(self_destruct=True) finally: # Delete the temp folder explicitly and remove the delta format from header temp_dir.cleanup() @@ -459,6 +467,7 @@ def _to_pandas( if limit is not None and rows_read >= limit: break + print(f"Received {len(pdfs)} batches of data.") pdf = pd.concat(pdfs, axis=0, ignore_index=True, copy=False) if limit is not None: pdf = pdf.head(limit) diff --git a/python/delta_sharing/tests/test_delta_sharing.py b/python/delta_sharing/tests/test_delta_sharing.py index 2f6ba708d..09ceef563 100644 --- a/python/delta_sharing/tests/test_delta_sharing.py +++ b/python/delta_sharing/tests/test_delta_sharing.py @@ -96,6 +96,7 @@ def _verify_all_tables_result(tables: Sequence[Table]): Table(name="table_wasb", share="share_azure", schema="default"), Table(name="table_abfs", share="share_azure", schema="default"), Table(name="table_gcs", share="share_gcp", schema="default"), + Table(name="12k_rows", share="share8", schema="default"), Table(name="timestampntz_cdf_table", share="share8", schema="default"), Table(name="cdf_table_cdf_enabled", share="share8", schema="default"), Table(name="cdf_table_with_partition", share="share8", schema="default"), @@ -1687,3 +1688,110 @@ def test_load_table_changes_as_spark( match="Unable to import pyspark. `load_table_changes_as_spark` requires" + " PySpark.", ): load_table_changes_as_spark("not-used") + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments", + [pytest.param("share8.default.12k_rows")], +) +def test_load_as_pandas_delta_batch_convert( + profile_path: str, + fragments: str, +): + ids = list(range(12000)) + expected = pd.DataFrame( + { + "id": ids, + "name": [f"str_{n}" for n in ids], + "time": [pd.Timestamp(n * 10**5, unit="s", tz="UTC") for n in ids], + "val": [n**0.5 for n in ids], + } + ) + expected["time"] = expected["time"].astype("datetime64[us, UTC]") + + pdf = ( + load_as_pandas( + f"{profile_path}#{fragments}", use_delta_format=True, convert_in_batches=True + ) + .sort_values(by="id") + .reset_index(drop=True) + ) + pd.testing.assert_frame_equal(pdf, expected) + pdf = load_as_pandas( + f"{profile_path}#{fragments}", use_delta_format=True, convert_in_batches=True, limit=500 + ) + assert len(pdf) == 500 + pdf = load_as_pandas( + f"{profile_path}#{fragments}", use_delta_format=True, convert_in_batches=True, limit=3000 + ) + assert len(pdf) == 3000 + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments", + [pytest.param("share8.default.12k_rows")], +) +def test_load_table_changes_as_pandas_delta_batch_convert( + profile_path: str, + fragments: str, +): + rows_to_insert = list(range(100, 1600, 100)) # adds up to 12000 + + version_to_timestamp = { + 1: pd.Timestamp("2025-05-29T02:51:51.000+00:00"), + 2: pd.Timestamp("2025-05-29T02:51:53.000+00:00"), + 3: pd.Timestamp("2025-05-29T02:51:54.000+00:00"), + 4: pd.Timestamp("2025-05-29T02:51:56.000+00:00"), + 5: pd.Timestamp("2025-05-29T02:51:57.000+00:00"), + 6: pd.Timestamp("2025-05-29T02:51:58.000+00:00"), + 7: pd.Timestamp("2025-05-29T02:52:00.000+00:00"), + 8: pd.Timestamp("2025-05-29T02:52:01.000+00:00"), + 9: pd.Timestamp("2025-05-29T02:52:03.000+00:00"), + 10: pd.Timestamp("2025-05-29T02:52:04.000+00:00"), + 11: pd.Timestamp("2025-05-29T02:52:05.000+00:00"), + 12: pd.Timestamp("2025-05-29T02:52:07.000+00:00"), + 13: pd.Timestamp("2025-05-29T02:52:09.000+00:00"), + 14: pd.Timestamp("2025-05-29T02:52:11.000+00:00"), + 15: pd.Timestamp("2025-05-29T02:52:12.000+00:00"), + } + + rows_inserted = 0 + version = 1 + expected_pdfs = [] + for row_count in rows_to_insert: + ids = list(range(rows_inserted, rows_inserted + row_count)) + pdf_added = pd.DataFrame( + { + "id": ids, + "name": [f"str_{n}" for n in ids], + "time": [pd.Timestamp(n * 10**5, unit="s", tz="UTC") for n in ids], + "val": [n**0.5 for n in ids], + "_change_type": "insert", + "_commit_version": version, + "_commit_timestamp": version_to_timestamp[version], + } + ) + expected_pdfs.append(pdf_added) + rows_inserted += row_count + version += 1 + + expected = ( + pd.concat(expected_pdfs, axis=0) + .sort_values(by=["_commit_timestamp", "id"]) + .reset_index(drop=True) + ) + expected["_commit_timestamp"] = expected["_commit_timestamp"].astype("datetime64[us, UTC]") + expected["time"] = expected["time"].astype("datetime64[us, UTC]") + pdf = ( + load_table_changes_as_pandas( + f"{profile_path}#{fragments}", + starting_version=0, + use_delta_format=True, + convert_in_batches=True, + ) + .sort_values(by=["_commit_timestamp", "id"]) + .reset_index(drop=True) + ) + pd.testing.assert_frame_equal(pdf, expected) diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index 719604251..4c8e71fa1 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -327,17 +327,49 @@ def autoresolve_query_format(self, table: Table): return "parquet" expected = pd.concat([pdf1, pdf2]).reset_index(drop=True) + expected_100k = pdf1.head(100000) + expected_300k = pd.concat([pdf1, pdf2.head(100000)]).reset_index(drop=True) reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() pd.testing.assert_frame_equal(pdf, expected) + reader = DeltaSharingReader( + Table("table_name", "share_name", "schema_name"), RestClientMock(), limit=100000 + ) + pdf = reader.to_pandas() + pd.testing.assert_frame_equal(pdf, expected_100k) + + reader = DeltaSharingReader( + Table("table_name", "share_name", "schema_name"), RestClientMock(), limit=300000 + ) + pdf = reader.to_pandas() + pd.testing.assert_frame_equal(pdf, expected_300k) + reader = DeltaSharingReader( Table("table_name", "share_name", "schema_name"), RestClientMock(), convert_in_batches=True ) pdf = reader.to_pandas() pd.testing.assert_frame_equal(pdf, expected) + reader = DeltaSharingReader( + Table("table_name", "share_name", "schema_name"), + RestClientMock(), + limit=100000, + convert_in_batches=True, + ) + pdf = reader.to_pandas() + pd.testing.assert_frame_equal(pdf, expected_100k) + + reader = DeltaSharingReader( + Table("table_name", "share_name", "schema_name"), + RestClientMock(), + limit=300000, + convert_in_batches=True, + ) + pdf = reader.to_pandas() + pd.testing.assert_frame_equal(pdf, expected_300k) + @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_to_pandas_empty(rest_client: DataSharingRestClient): diff --git a/server/src/test/scala/io/delta/sharing/server/TestResource.scala b/server/src/test/scala/io/delta/sharing/server/TestResource.scala index 4e04c0902..5fb41d95e 100644 --- a/server/src/test/scala/io/delta/sharing/server/TestResource.scala +++ b/server/src/test/scala/io/delta/sharing/server/TestResource.scala @@ -206,6 +206,12 @@ object TestResource { SchemaConfig( "default", java.util.Arrays.asList( + TableConfig( + "12k_rows", + s"s3a://${AWS.bucket}/delta-exchange-test/12k_rows", + "00000000-0000-0000-0000-000000000096", + historyShared = true + ), TableConfig( "timestampntz_cdf_table", s"s3a://${AWS.bucket}/delta-exchange-test/timestampntz_cdf_table",