diff --git a/deltacat/compute/converter/__init__.py b/deltacat/compute/converter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/constants.py b/deltacat/compute/converter/constants.py new file mode 100644 index 00000000..77e4db2b --- /dev/null +++ b/deltacat/compute/converter/constants.py @@ -0,0 +1,4 @@ +DEFAULT_CONVERTER_TASK_MAX_PARALLELISM = 4096 + +# Safe limit ONLY considering CPU limit, typically 32 for a 8x-large worker +DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD = 30 diff --git a/deltacat/compute/converter/dev/single_merge_key_converter_test.py b/deltacat/compute/converter/dev/single_merge_key_converter_test.py new file mode 100644 index 00000000..472b9124 --- /dev/null +++ b/deltacat/compute/converter/dev/single_merge_key_converter_test.py @@ -0,0 +1,570 @@ +import pyarrow as pa +import uuid +import boto3 +import ray + +from typing import Optional +from deltacat.compute.converter.equality_delete_to_position_delete_session import ( + convert_equality_deletes_to_position_deletes, +) +from deltacat.compute.converter.model.convert_session_params import ConvertSessionParams + + +def get_s3_path( + bucket_name: str, + database_name: Optional[str] = None, + table_name: Optional[str] = None, +) -> str: + result_path = f"s3://{bucket_name}" + if database_name is not None: + result_path += f"/{database_name}.db" + + if table_name is not None: + result_path += f"/{table_name}" + return result_path + + +def get_bucket_name(): + return "metadata-py4j-zyiqin1" + + +def get_credential(): + boto3_session = boto3.Session() + credentials = boto3_session.get_credentials() + return credentials + + +def get_glue_catalog(): + from pyiceberg.catalog import load_catalog + + credential = get_credential() + access_key_id = credential.access_key + secret_access_key = credential.secret_key + session_token = credential.token + s3_path = get_s3_path(get_bucket_name()) + glue_catalog = load_catalog( + "glue", + **{ + "warehouse": s3_path, + "type": "glue", + "aws_access_key_id": access_key_id, + "aws_secret_access_key": secret_access_key, + "aws_session_token": session_token, + "region_name": "us-east-1", + "s3.access-key-id": access_key_id, + "s3.secret-access-key": secret_access_key, + "s3.session-token": session_token, + "s3.region": "us-east-1", + }, + ) + + return glue_catalog + + +def get_table_schema(): + from pyiceberg.schema import Schema + from pyiceberg.types import ( + NestedField, + StringType, + LongType, + ) + + return Schema( + NestedField( + field_id=1, name="partitionkey", field_type=StringType(), required=False + ), + NestedField(field_id=2, name="bucket", field_type=LongType(), required=False), + NestedField( + field_id=3, name="primarykey", field_type=StringType(), required=False + ), + NestedField( + field_id=2147483546, + name="file_path", + field_type=StringType(), + required=False, + ), + NestedField( + field_id=2147483545, name="pos", field_type=LongType(), require=False + ), + schema_id=1, + ) + + +def get_partition_spec(): + from pyiceberg.partitioning import PartitionSpec, PartitionField + from pyiceberg.transforms import IdentityTransform + + partition_field_identity = PartitionField( + source_id=1, field_id=101, transform=IdentityTransform(), name="partitionkey" + ) + partition_spec = PartitionSpec(partition_field_identity) + return partition_spec + + +def create_table_with_data_files_and_equality_deletes(table_version): + glue_catalog = get_glue_catalog() + schema = get_table_schema() + ps = get_partition_spec() + + properties = dict() + properties["write.format.default"] = "parquet" + properties["write.delete.mode"] = "merge-on-read" + properties["write.update.mode"] = "merge-on-read" + properties["write.merge.mode"] = "merge-on-read" + properties["format-version"] = "2" + glue_catalog.create_table( + f"testio.example_{table_version}_partitioned", + schema=schema, + partition_spec=ps, + properties=properties, + ) + + +def load_table(table_version): + glue_catalog = get_glue_catalog() + loaded_table = glue_catalog.load_table( + f"testio.example_{table_version}_partitioned" + ) + return loaded_table + + +def get_s3_file_system(): + import pyarrow + + credential = get_credential() + access_key_id = credential.access_key + secret_access_key = credential.secret_key + session_token = credential.token + return pyarrow.fs.S3FileSystem( + access_key=access_key_id, + secret_key=secret_access_key, + session_token=session_token, + ) + + +def write_pos_delete_table(tmp_path: str, data_file_path) -> str: + import pyarrow.parquet as pq + + uuid_path = uuid.uuid4() + deletes_file_path = f"{tmp_path}/deletes_{uuid_path}.parquet" + # Note: The following path should reference correct data file path to make sure positional delete are correctly applied + # Hardcoded file path for quick POC purpose + path = data_file_path + # path = "s3://metadata-py4j-zyiqin1/data_a4f15d4a-20f6-4253-9926-d01c2cfbf884.parquet" + table = pa.table( + { + "partitionkey": ["1", "1"], + "file_path": [ + path, + "s3://metadata-py4j-zyiqin1/data_edde020f-f7d4-457d-9f9a-9331b7267860.parquet", + ], + "pos": [1, 1], + } + ) + file_system = get_s3_file_system() + pq.write_table(table, deletes_file_path, filesystem=file_system) + return build_delete_data_file(f"s3://{deletes_file_path}") + + +def write_data_table( + tmp_path: str, batch_number, number_of_records, partition_value +) -> str: + import pyarrow.parquet as pq + + uuid_path = uuid.uuid4() + deletes_file_path = f"{tmp_path}/data_{uuid_path}.parquet" + table = generate_test_pyarrow_table( + batch_number=batch_number, + number_of_records=number_of_records, + partition_value=partition_value, + ) + file_system = get_s3_file_system() + pq.write_table(table, deletes_file_path, filesystem=file_system) + return build_delete_data_file(f"s3://{deletes_file_path}") + + +def build_delete_data_file(file_path): + print(f"build_delete_file_path:{file_path}") + return file_path + + +def commit_pos_delete_to_table(table, data_file_paths): + delete_s3_url = "metadata-py4j-zyiqin1" + data_files = [write_pos_delete_table(delete_s3_url, data_file_paths)] + add_delete_files(file_paths=data_files) + + +def commit_data_to_table(table, batch_number, number_of_records, partition_value): + delete_s3_url = "metadata-py4j-zyiqin1" + data_files = [ + write_data_table( + delete_s3_url, batch_number, number_of_records, partition_value + ) + ] + add_data_files(file_paths=data_files) + return data_files + + +def commit_equality_delete_to_table( + table, to_be_deleted_batch_number, partition_value, number_of_records +): + delete_s3_url = "metadata-py4j-zyiqin1" + data_files = [ + write_equality_data_table( + delete_s3_url=delete_s3_url, + to_be_deleted_batch_number=to_be_deleted_batch_number, + partition_value=partition_value, + number_of_records=number_of_records, + ) + ] + add_equality_data_files(file_paths=data_files) + return data_files + + +def write_equality_data_table( + delete_s3_url, to_be_deleted_batch_number, partition_value, number_of_records +): + import pyarrow.parquet as pq + + uuid_path = uuid.uuid4() + deletes_file_path = f"{delete_s3_url}/equality_delete_{uuid_path}.parquet" + table = generate_test_pyarrow_table( + batch_number=to_be_deleted_batch_number, + partition_value=partition_value, + number_of_records=number_of_records, + ) + file_system = get_s3_file_system() + pq.write_table(table, deletes_file_path, filesystem=file_system) + return build_delete_data_file(f"s3://{deletes_file_path}") + + +def generate_test_pyarrow_table(batch_number, number_of_records, partition_value): + primary_keys_iterables = range(1, number_of_records + 1, 1) + primary_keys = list( + f"pk_sequence{batch_number}_value{str(index)}" + for index in primary_keys_iterables + ) + print(f"primary_keys:{primary_keys}") + test_table = pa.table( + { + "partitionkey": [partition_value] * number_of_records, + "primarykey": primary_keys, + "bucket": [1] * number_of_records, + } + ) + return test_table + + +# commit to s3 +def parquet_files_to_positional_delete_files(io, table_metadata, file_paths): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.POSITION_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(partitionkey="111"), + # partition=Record(**{"pk": "111", "bucket": 2}), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + yield data_file + + +def produce_pos_delete_file(io, table_metadata, file_path): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.POSITION_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(partitionkey="111"), + # partition=Record(**{"pk": "111", "bucket": 2}), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + return data_file + + +def parquet_files_to_data_files(io, table_metadata, file_paths): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + # pv = Record(**{"pk": "222", "bucket": 1}) + pv = Record(**{"partitionkey": "111"}) + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=pv, + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + yield data_file + + +def add_delete_files(file_paths): + table = load_table(TABLE_VERSION) + + with table.transaction() as tx: + if table.metadata.name_mapping() is None: + table.set_properties( + **{ + "schema.name-mapping.default": table.table_metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = parquet_files_to_positional_delete_files( + table_metadata=table.metadata, file_paths=file_paths, io=table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + + +def add_data_files(file_paths): + table = load_table(TABLE_VERSION) + # table.refresh() + with table.transaction() as tx: + if table.metadata.name_mapping() is None: + tx.set_properties( + **{ + "schema.name-mapping.default": table.metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = parquet_files_to_data_files( + table_metadata=table.metadata, file_paths=file_paths, io=table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + + +def add_equality_data_files(file_paths): + table = load_table(TABLE_VERSION) + with table.transaction() as tx: + if table.metadata.name_mapping() is None: + tx.set_properties( + **{ + "schema.name-mapping.default": table.metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = parquet_files_to_equality_data_files( + table_metadata=table.metadata, file_paths=file_paths, io=table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + + +def parquet_files_to_equality_data_files(io, table_metadata, file_paths): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.EQUALITY_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(partitionkey="111"), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + yield data_file + + +def scan_table(table): + print(f"scan_table result:{table.scan().to_arrow().to_pydict()}") + + +def initialize_ray(): + if not ray.is_initialized(): + ray.init(local_mode=True, ignore_reinit_error=True) + + +initialize_ray() +# Test with creating a new iceberg table +TABLE_VERSION = "38" +iceberg_table_name = f"testio.example_{TABLE_VERSION}_partitioned" +# create_table_with_data_files_and_equality_deletes(TABLE_VERSION) +table = load_table(TABLE_VERSION) + +# Using batch_number to simulate the snapshot sequence +# 1. commit equality delete batch 1, which shouldn't take into any effect as no data files is committed yet +batch_number_1 = 1 +data_file_paths = commit_equality_delete_to_table( + table, + to_be_deleted_batch_number=batch_number_1, + partition_value="1", + number_of_records=1, +) + +# 2. commit 3 records for data table batch 2 +batch_number_2 = 2 +commit_data_to_table( + table, batch_number=batch_number_2, partition_value="1", number_of_records=3 +) + +# 3. Commit 1 equality delete record for data table 2 +batch_number_3 = 3 +commit_equality_delete_to_table( + table, + to_be_deleted_batch_number=batch_number_2, + partition_value="1", + number_of_records=1, +) + +# 4. commit 3 records for data table batch 4 +batch_number_4 = 4 +commit_data_to_table( + table, batch_number=batch_number_4, partition_value="1", number_of_records=3 +) + +# 5. Commit 1 equality delete record for data table batch 4 +batch_number_5 = 5 +commit_equality_delete_to_table( + table, + to_be_deleted_batch_number=batch_number_4, + partition_value="1", + number_of_records=1, +) + +# 6. Commit 3 records for data table batch 6 +batch_number_6 = 6 +commit_data_to_table( + table, batch_number=batch_number_6, partition_value="1", number_of_records=3 +) +# Total records remaining should be 3 - 1 + 3 - 1 + 3 = 7 when reading with Spark + + +glue_catalog = get_glue_catalog() +convert_session_params = ConvertSessionParams.of( + { + "catalog": glue_catalog, + "iceberg_table_name": iceberg_table_name, + "iceberg_warehouse_bucket_name": get_s3_path(get_bucket_name()), + } +) + +convert_equality_deletes_to_position_deletes(params=convert_session_params) diff --git a/deltacat/compute/converter/equality_delete_to_position_delete_session.py b/deltacat/compute/converter/equality_delete_to_position_delete_session.py new file mode 100644 index 00000000..89c63a26 --- /dev/null +++ b/deltacat/compute/converter/equality_delete_to_position_delete_session.py @@ -0,0 +1,136 @@ +# from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from deltacat.utils.ray_utils.concurrency import ( + invoke_parallel, + task_resource_options_provider, +) +import ray +import functools +from deltacat.compute.converter.utils.convert_task_options import ( + convert_resource_options_provider, +) +import logging +from deltacat import logs +from collections import defaultdict +from deltacat.compute.converter.model.convert_session_params import ConvertSessionParams +from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD +from deltacat.compute.converter.steps.convert import convert +from deltacat.compute.converter.model.convert_input import ConvertInput +from deltacat.compute.converter.pyiceberg.overrides import ( + fetch_all_bucket_files, + parquet_files_dict_to_iceberg_data_files, +) +from deltacat.compute.converter.utils.converter_session_utils import ( + check_data_files_sequence_number, +) +from deltacat.compute.converter.pyiceberg.replace_snapshot import ( + commit_overwrite_snapshot, +) +from deltacat.compute.converter.pyiceberg.catalog import load_table + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def convert_equality_deletes_to_position_deletes( + params: ConvertSessionParams, **kwargs +): + """ + Convert equality delete to position delete. + Compute and memory heavy work from downloading equality delete table and compute position deletes + will be executed on Ray remote tasks. + """ + + catalog = params.catalog + table_name = params.iceberg_table_name + iceberg_table = load_table(catalog, table_name) + data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files( + iceberg_table + ) + + # files_for_each_bucket: {partition_value: [(equality_delete_files_list, data_files_list, pos_delete_files_list)] + files_for_each_bucket = defaultdict(tuple) + for k, v in data_file_dict.items(): + logger.info(f"data_file: k, v:{k, v}") + for k, v in equality_delete_dict.items(): + logger.info(f"equality_delete_file: k, v:{k, v}") + for partition_value, equality_delete_file_list in equality_delete_dict.items(): + ( + result_equality_delete_file, + result_data_file, + ) = check_data_files_sequence_number( + data_files_list=data_file_dict[partition_value], + equality_delete_files_list=equality_delete_dict[partition_value], + ) + logger.info(f"result_data_file:{result_data_file}") + logger.info(f"result_equality_delete_file:{result_equality_delete_file}") + files_for_each_bucket[partition_value] = ( + result_data_file, + result_equality_delete_file, + [], + ) + + iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name + print(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}") + merge_keys = params.merge_keys + # Using table identifier fields as merge keys if merge keys not provided + if not merge_keys: + # identifier_fields = iceberg_table.schema().identifier_field_names() + identifier_fields = ["primarykey"] + convert_options_provider = functools.partial( + task_resource_options_provider, + resource_amount_provider=convert_resource_options_provider, + ) + + # TODO (zyiqin): max_parallel_data_file_download should be determined by memory requirement for each bucket. + # Specifically, for case when files for one bucket memory requirement exceed one worker node's memory limit, WITHOUT rebasing with larger hash bucket count, + # 1. We can control parallel files to download by adjusting max_parallel_data_file_download. + # 2. Implement two-layer converter tasks, with convert tasks to spin up child convert tasks. + # Note that approach 2 will ideally require shared object store to avoid download equality delete files * number of child tasks times. + max_parallel_data_file_download = DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD + + compact_small_files = params.compact_small_files + position_delete_for_multiple_data_files = ( + params.position_delete_for_multiple_data_files + ) + task_max_parallelism = params.task_max_parallelism + + def convert_input_provider(index, item): + return { + "convert_input": ConvertInput.of( + files_for_each_bucket=item, + convert_task_index=index, + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + identifier_fields=identifier_fields, + compact_small_files=compact_small_files, + position_delete_for_multiple_data_files=position_delete_for_multiple_data_files, + max_parallel_data_file_download=max_parallel_data_file_download, + ) + } + + # Ray remote task: convert + # Assuming that memory consume by each bucket doesn't exceed one node's memory limit. + # TODO: Add split mechanism to split large buckets + convert_tasks_pending = invoke_parallel( + items=files_for_each_bucket.items(), + ray_task=convert, + max_parallelism=task_max_parallelism, + options_provider=convert_options_provider, + kwargs_provider=convert_input_provider, + ) + to_be_deleted_files_list = [] + to_be_added_files_dict_list = [] + convert_results = ray.get(convert_tasks_pending) + for convert_result in convert_results: + to_be_deleted_files_list.extend(convert_result[0].values()) + to_be_added_files_dict_list.append(convert_result[1]) + + new_position_delete_files = parquet_files_dict_to_iceberg_data_files( + io=iceberg_table.io, + table_metadata=iceberg_table.metadata, + files_dict_list=to_be_added_files_dict_list, + ) + commit_overwrite_snapshot( + iceberg_table=iceberg_table, + # equality_delete_files + data file that all rows are deleted + to_be_deleted_files_list=to_be_deleted_files_list[0], + new_position_delete_files=new_position_delete_files, + ) diff --git a/deltacat/compute/converter/model/__init__.py b/deltacat/compute/converter/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/model/convert_input.py b/deltacat/compute/converter/model/convert_input.py new file mode 100644 index 00000000..556cb2f6 --- /dev/null +++ b/deltacat/compute/converter/model/convert_input.py @@ -0,0 +1,55 @@ +from __future__ import annotations +from typing import Dict, List + + +class ConvertInput(Dict): + @staticmethod + def of( + files_for_each_bucket, + convert_task_index, + iceberg_warehouse_bucket_name, + identifier_fields, + compact_small_files, + position_delete_for_multiple_data_files, + max_parallel_data_file_download, + ) -> ConvertInput: + + result = ConvertInput() + result["files_for_each_bucket"] = files_for_each_bucket + result["convert_task_index"] = convert_task_index + result["identifier_fields"] = identifier_fields + result["iceberg_warehouse_bucket_name"] = iceberg_warehouse_bucket_name + result["compact_small_files"] = compact_small_files + result[ + "position_delete_for_multiple_data_files" + ] = position_delete_for_multiple_data_files + result["max_parallel_data_file_download"] = max_parallel_data_file_download + return result + + @property + def files_for_each_bucket(self) -> tuple: + return self["files_for_each_bucket"] + + @property + def identifier_fields(self) -> List[str]: + return self["identifier_fields"] + + @property + def convert_task_index(self) -> int: + return self["convert_task_index"] + + @property + def iceberg_warehouse_bucket_name(self) -> str: + return self["iceberg_warehouse_bucket_name"] + + @property + def compact_small_files(self) -> bool: + return self["compact_small_files"] + + @property + def position_delete_for_multiple_data_files(self) -> bool: + return self["position_delete_for_multiple_data_files"] + + @property + def max_parallel_data_file_download(self) -> int: + return self["max_parallel_data_file_download"] diff --git a/deltacat/compute/converter/model/convert_session_params.py b/deltacat/compute/converter/model/convert_session_params.py new file mode 100644 index 00000000..11eeab22 --- /dev/null +++ b/deltacat/compute/converter/model/convert_session_params.py @@ -0,0 +1,79 @@ +from __future__ import annotations +from typing import Optional, Dict +from deltacat.compute.converter.constants import DEFAULT_CONVERTER_TASK_MAX_PARALLELISM + + +class ConvertSessionParams(dict): + """ + This class represents the parameters passed to convert_ (deltacat/compute/compactor/compaction_session.py) + """ + + @staticmethod + def of(params: Optional[Dict]) -> ConvertSessionParams: + params = {} if params is None else params + assert params.get("catalog") is not None, "catalog is a required arg" + assert ( + params.get("iceberg_table_name") is not None + ), "iceberg_table_name is a required arg" + assert ( + params.get("iceberg_warehouse_bucket_name") is not None + ), "iceberg_warehouse_bucket_name is a required arg" + result = ConvertSessionParams(params) + + result.compact_small_files = params.get("compact_small_files", False) + result.position_delete_for_multiple_data_files = params.get( + "position_delete_for_multiple_data_files", False + ) + result.task_max_parallelism = params.get( + "task_max_parallelism", DEFAULT_CONVERTER_TASK_MAX_PARALLELISM + ) + result.merge_keys = params.get("merge_keys", None) + return result + + @property + def catalog(self): + return self["catalog"] + + @property + def iceberg_table_name(self) -> str: + return self["iceberg_table_name"] + + @property + def iceberg_warehouse_bucket_name(self) -> str: + return self["iceberg_warehouse_bucket_name"] + + @property + def compact_small_files(self) -> bool: + return self["compact_small_files"] + + @compact_small_files.setter + def compact_small_files(self, compact_small_files) -> None: + self["compact_small_files"] = compact_small_files + + @property + def position_delete_for_multiple_data_files(self) -> bool: + return self["position_delete_for_multiple_data_files"] + + @position_delete_for_multiple_data_files.setter + def position_delete_for_multiple_data_files( + self, position_delete_for_multiple_data_files + ) -> None: + self[ + "position_delete_for_multiple_data_files" + ] = position_delete_for_multiple_data_files + + @property + def task_max_parallelism(self) -> str: + return self["task_max_parallelism"] + + @task_max_parallelism.setter + def task_max_parallelism(self, task_max_parallelism) -> None: + self["task_max_parallelism"] = task_max_parallelism + + @property + def merge_keys(self) -> str: + return self["merge_keys"] + + @merge_keys.setter + def merge_keys(self, merge_keys) -> None: + self["merge_keys"] = merge_keys diff --git a/deltacat/compute/converter/pyiceberg/__init__.py b/deltacat/compute/converter/pyiceberg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/pyiceberg/catalog.py b/deltacat/compute/converter/pyiceberg/catalog.py new file mode 100644 index 00000000..e567628d --- /dev/null +++ b/deltacat/compute/converter/pyiceberg/catalog.py @@ -0,0 +1,73 @@ +from typing import Optional + + +def load_catalog(iceberg_catalog_name, iceberg_catalog_properties): + catalog = load_catalog( + name=iceberg_catalog_name, + **iceberg_catalog_properties, + ) + return catalog + + +def get_s3_path( + bucket_name: str, + database_name: Optional[str] = None, + table_name: Optional[str] = None, +) -> str: + result_path = f"s3://{bucket_name}" + if database_name is not None: + result_path += f"/{database_name}.db" + + if table_name is not None: + result_path += f"/{table_name}" + return result_path + + +def get_bucket_name(): + return "metadata-py4j-zyiqin1" + + +def get_s3_prefix(): + return get_s3_path(get_bucket_name()) + + +def get_credential(): + import boto3 + + boto3_session = boto3.Session() + credentials = boto3_session.get_credentials() + return credentials + + +def get_glue_catalog(): + # # from pyiceberg.catalog.glue import GLUE_CATALOG_ENDPOINT, GlueCatalog + from pyiceberg.catalog import load_catalog + + credential = get_credential() + access_key_id = credential.access_key + secret_access_key = credential.secret_key + session_token = credential.token + # logger.info(f"session_token: {session_token}") + s3_path = get_s3_prefix() + glue_catalog = load_catalog( + "glue", + **{ + "warehouse": s3_path, + "type": "glue", + "aws_access_key_id": access_key_id, + "aws_secret_access_key": secret_access_key, + "aws_session_token": session_token, + "region_name": "us-east-1", + "s3.access-key-id": access_key_id, + "s3.secret-access-key": secret_access_key, + "s3.session-token": session_token, + "s3.region": "us-east-1", + }, + ) + + return glue_catalog + + +def load_table(catalog, table_name): + loaded_table = catalog.load_table(table_name) + return loaded_table diff --git a/deltacat/compute/converter/pyiceberg/overrides.py b/deltacat/compute/converter/pyiceberg/overrides.py new file mode 100644 index 00000000..d9e67034 --- /dev/null +++ b/deltacat/compute/converter/pyiceberg/overrides.py @@ -0,0 +1,134 @@ +from collections import defaultdict +import logging +from deltacat import logs +import pyarrow.parquet as pq + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + + data_file_content_type = DataFileContent.POSITION_DELETES + iceberg_files = [] + schema = table_metadata.schema() + for files_dict in files_dict_list: + for partition_value, file_paths in files_dict.items(): + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan( + schema, table_metadata.properties + ), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=data_file_content_type, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=partition_value, + # partition=Record(**{"pk": "111", "bucket": 2}), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + iceberg_files.append(data_file) + return iceberg_files + + +def fetch_all_bucket_files(table): + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + from pyiceberg.typedef import ( + KeyDefaultDict, + ) + + data_scan = table.scan() + snapshot = data_scan.snapshot() + if not snapshot: + return iter([]) + manifest_evaluators = KeyDefaultDict(data_scan._build_manifest_evaluator) + + manifests = [ + manifest_file + for manifest_file in snapshot.manifests(data_scan.io) + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + ] + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator + from pyiceberg.types import ( + strtobool, + ) + from pyiceberg.table import _min_sequence_number, _open_manifest + from pyiceberg.utils.concurrent import ExecutorFactory + from itertools import chain + from pyiceberg.manifest import DataFileContent + + partition_evaluators = KeyDefaultDict(data_scan._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + data_scan.table_metadata.schema(), + data_scan.row_filter, + data_scan.case_sensitive, + strtobool(data_scan.options.get("include_empty_files", "false")), + ).eval + + min_sequence_number = _min_sequence_number(manifests) + + # {"bucket_index": List[DataFile]} + data_entries = defaultdict(list) + equality_data_entries = defaultdict(list) + positional_delete_entries = defaultdict(list) + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + data_scan.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + metrics_evaluator, + ) + for manifest in manifests + if data_scan._check_sequence_number(min_sequence_number, manifest) + ], + ) + ): + data_file = manifest_entry.data_file + file_sequence_number = manifest_entry.sequence_number + data_file_tuple = (file_sequence_number, data_file) + partition_value = data_file.partition + if data_file.content == DataFileContent.DATA: + data_entries[partition_value].append(data_file_tuple) + if data_file.content == DataFileContent.POSITION_DELETES: + positional_delete_entries[partition_value].append(data_file_tuple) + elif data_file.content == DataFileContent.EQUALITY_DELETES: + equality_data_entries[partition_value].append(data_file_tuple) + else: + logger.warning( + f"Unknown DataFileContent ({data_file.content}): {manifest_entry}" + ) + return data_entries, equality_data_entries, positional_delete_entries diff --git a/deltacat/compute/converter/pyiceberg/replace_snapshot.py b/deltacat/compute/converter/pyiceberg/replace_snapshot.py new file mode 100644 index 00000000..5f8f88a8 --- /dev/null +++ b/deltacat/compute/converter/pyiceberg/replace_snapshot.py @@ -0,0 +1,174 @@ +from typing import Optional, List +import uuid +from pyiceberg.table.snapshots import ( + Operation, +) +from pyiceberg.manifest import ( + DataFileContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, + write_manifest, +) +import itertools +from pyiceberg.utils.concurrent import ExecutorFactory +from pyiceberg.table import UpdateSnapshot, _SnapshotProducer + + +class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]): + """Overwrites data from the table. This will produce an OVERWRITE snapshot. + + Data and delete files were added and removed in a logical overwrite operation. + """ + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + existing_files = [] + snapshot = self._transaction.table_metadata.current_snapshot() + if snapshot: + for manifest_file in snapshot.manifests(io=self._io): + entries = manifest_file.fetch_manifest_entry( + io=self._io, discard_deleted=True + ) + + found_deleted_data_files = [ + entry.data_file + for entry in entries + if entry.data_file in self._deleted_data_files + ] + + if len(found_deleted_data_files) == 0: + existing_files.append(manifest_file) + else: + # We have to replace the manifest file without the deleted data files + if any( + entry.data_file not in found_deleted_data_files + for entry in entries + ): + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[ + manifest_file.partition_spec_id + ], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) as writer: + [ + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + for entry in entries + if entry.data_file not in found_deleted_data_files + ] + existing_files.append(writer.to_manifest_file()) + return existing_files + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted entries. + + With a full overwrite all the entries are considered deleted. + With partial overwrites we have to use the predicate to evaluate + which entries are affected. + """ + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id( + self._parent_snapshot_id + ) + if previous_snapshot is None: + # This should never happen since you cannot overwrite an empty table + raise ValueError( + f"Could not find the previous snapshot: {self._parent_snapshot_id}" + ) + + executor = ExecutorFactory.get_or_create() + + def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: + return [ + ManifestEntry( + status=ManifestEntryStatus.DELETED, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + for entry in manifest.fetch_manifest_entry( + self._io, discard_deleted=True + ) + if entry.data_file.content == DataFileContent.DATA + and entry.data_file in self._deleted_data_files + ] + + list_of_entries = executor.map( + _get_entries, previous_snapshot.manifests(self._io) + ) + return list(itertools.chain(*list_of_entries)) + else: + return [] + + +def replace( + self, + commit_uuid: Optional[uuid.UUID] = None, + using_starting_sequence: Optional[bool] = False, +) -> _ReplaceFiles: + print( + f"tansaction_current_snapshot:{self._transaction.table_metadata.current_snapshot()}" + ) + return _ReplaceFiles( + commit_uuid=commit_uuid, + operation=Operation.REPLACE + if self._transaction.table_metadata.current_snapshot() is not None + else Operation.APPEND, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + using_starting_sequence=using_starting_sequence, + ) + + +UpdateSnapshot.replace = replace + + +def commit_replace_snapshot( + iceberg_table, to_be_deleted_files_list, new_position_delete_files +): + tx = iceberg_table.transaction() + snapshot_properties = {} + commit_uuid = uuid.uuid4() + update_snapshot = tx.update_snapshot(snapshot_properties=snapshot_properties) + replace_snapshot = replace( + self=update_snapshot, commit_uuid=commit_uuid, using_starting_sequence=False + ) + for to_be_deleted_file in to_be_deleted_files_list: + replace_snapshot.append_data_file(to_be_deleted_file) + for to_be_added_file in new_position_delete_files: + replace_snapshot.delete_data_file(to_be_added_file) + replace_snapshot._commit() + tx.commit_transaction() + + +def commit_overwrite_snapshot( + iceberg_table, to_be_deleted_files_list, new_position_delete_files +): + commit_uuid = uuid.uuid4() + with iceberg_table.transaction() as tx: + if iceberg_table.metadata.name_mapping() is None: + iceberg_table.set_properties( + **{ + "schema.name-mapping.default": iceberg_table.table_metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().overwrite( + commit_uuid=commit_uuid + ) as overwrite_snapshot: + for data_file in new_position_delete_files: + overwrite_snapshot.append_data_file(data_file) + for original_data_file in to_be_deleted_files_list: + overwrite_snapshot.delete_data_file(original_data_file) diff --git a/deltacat/compute/converter/steps/__init__.py b/deltacat/compute/converter/steps/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/steps/convert.py b/deltacat/compute/converter/steps/convert.py new file mode 100644 index 00000000..a39aa514 --- /dev/null +++ b/deltacat/compute/converter/steps/convert.py @@ -0,0 +1,187 @@ +import pyarrow.compute as pc +import deltacat.compute.converter.utils.iceberg_columns as sc +import pyarrow as pa +import numpy as np +import daft +from collections import defaultdict +import ray +import logging +from deltacat.compute.converter.model.convert_input import ConvertInput +from deltacat.compute.converter.utils.s3u import upload_table_with_retry +from deltacat import logs + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +@ray.remote +def convert(convert_input: ConvertInput): + files_for_each_bucket = convert_input.files_for_each_bucket + convert_task_index = convert_input.convert_task_index + iceberg_warehouse_bucket_name = convert_input.iceberg_warehouse_bucket_name + identifier_fields = convert_input.identifier_fields + compact_small_files = convert_input.compact_small_files + position_delete_for_multiple_data_files = ( + convert_input.position_delete_for_multiple_data_files + ) + max_parallel_data_file_download = convert_input.max_parallel_data_file_download + + if position_delete_for_multiple_data_files: + raise NotImplementedError( + f"Distributed file level position delete compute is not supported yet" + ) + if compact_small_files: + raise NotImplementedError(f"Compact previous position delete not supported yet") + + logger.info(f"Starting convert task index: {convert_task_index}") + data_files, equality_delete_files, position_delete_files = files_for_each_bucket[1] + partition_value = files_for_each_bucket[0] + ( + to_be_deleted_files_list, + to_be_added_files_list, + ) = compute_pos_delete_with_limited_parallelism( + data_files_list=data_files, + identifier_columns=identifier_fields, + equality_delete_files_list=equality_delete_files, + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + max_parallel_data_file_download=max_parallel_data_file_download, + ) + to_be_delete_files_dict = defaultdict() + to_be_delete_files_dict[partition_value] = to_be_deleted_files_list + to_be_added_files_dict = defaultdict() + to_be_added_files_dict[partition_value] = to_be_added_files_list + return (to_be_delete_files_dict, to_be_added_files_dict) + + +def filter_rows_to_be_deleted( + equality_delete_table, data_file_table, identifier_columns +): + if equality_delete_table and data_file_table: + equality_deletes = pc.is_in( + data_file_table["primarykey"], + equality_delete_table["primarykey"], + ) + positional_delete_table = data_file_table.filter(equality_deletes) + logger.info(f"positional_delete_table:{positional_delete_table.to_pydict()}") + logger.info(f"data_file_table:{data_file_table.to_pydict()}") + logger.info( + f"length_pos_delete_table, {len(positional_delete_table)}, length_data_table:{len(data_file_table)}" + ) + if positional_delete_table: + positional_delete_table = positional_delete_table.drop(["primarykey"]) + if len(positional_delete_table) == len(data_file_table): + return True, None + return False, positional_delete_table + + +def compute_pos_delete( + equality_delete_table, + data_file_table, + identifier_columns, + iceberg_warehouse_bucket_name, +): + delete_whole_file, new_position_delete_table = filter_rows_to_be_deleted( + data_file_table=data_file_table, + equality_delete_table=equality_delete_table, + identifier_columns=identifier_columns, + ) + if new_position_delete_table: + logger.info(f"compute_pos_delete_table:{new_position_delete_table.to_pydict()}") + if new_position_delete_table: + new_pos_delete_s3_link = upload_table_with_retry( + new_position_delete_table, iceberg_warehouse_bucket_name, {} + ) + return delete_whole_file, new_pos_delete_s3_link + + +def download_bucketed_table(data_files, equality_delete_files): + from deltacat.utils.pyarrow import s3_file_to_table + + compacted_table = s3_file_to_table( + [data_file.file_path for data_file in data_files] + ) + equality_delete_table = s3_file_to_table( + [eq_file.file_path for eq_file in equality_delete_files] + ) + return compacted_table, equality_delete_table + + +def download_data_table(data_files, columns): + data_tables = [] + for file in data_files: + table = download_parquet_with_daft_hash_applied( + identify_columns=columns, file=file, s3_client_kwargs={} + ) + table = table.append_column( + sc._FILE_PATH_COLUMN_FIELD, + pa.array(np.repeat(file.file_path, len(table)), sc._FILE_PATH_COLUMN_TYPE), + ) + record_idx_iterator = iter(range(len(table))) + table = sc.append_record_idx_col(table, record_idx_iterator) + data_tables.append(table) + return pa.concat_tables(data_tables) + + +def compute_pos_delete_with_limited_parallelism( + data_files_list, + identifier_columns, + equality_delete_files_list, + iceberg_warehouse_bucket_name, + max_parallel_data_file_download, +): + to_be_deleted_file_list = [] + to_be_added_pos_delete_file_list = [] + + for data_files, equality_delete_files in zip( + data_files_list, equality_delete_files_list + ): + data_table = download_data_table( + data_files=data_files, columns=identifier_columns + ) + equality_delete_table = download_data_table( + data_files=equality_delete_files, columns=identifier_columns + ) + delete_whole_file, new_pos_delete_s3_link = compute_pos_delete( + equality_delete_table=equality_delete_table, + data_file_table=data_table, + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + identifier_columns=identifier_columns, + ) + if delete_whole_file: + to_be_deleted_file_list.extend(data_files) + to_be_deleted_file_list.extend(equality_delete_files) + if new_pos_delete_s3_link: + to_be_added_pos_delete_file_list.extend(new_pos_delete_s3_link) + + to_be_deleted_file_list.extend(equality_delete_files) + logger.info(f"convert_to_be_deleted_file_list:{to_be_deleted_file_list}") + logger.info( + f"convert_to_be_added_pos_delete_file_list:{to_be_added_pos_delete_file_list}" + ) + return to_be_deleted_file_list, to_be_added_pos_delete_file_list + + +def download_parquet_with_daft_hash_applied( + identify_columns, file, s3_client_kwargs, **kwargs +): + from daft import TimeUnit + + # TODO: Add correct read kwargs as in: + # https://github.com/ray-project/deltacat/blob/383855a4044e4dfe03cf36d7738359d512a517b4/deltacat/utils/daft.py#L97 + + coerce_int96_timestamp_unit = TimeUnit.from_str( + kwargs.get("coerce_int96_timestamp_unit", "ms") + ) + + from deltacat.utils.daft import _get_s3_io_config + + # TODO: Use Daft SHA1 hash instead to minimize probably of data corruption + io_config = _get_s3_io_config(s3_client_kwargs=s3_client_kwargs) + df = daft.read_parquet( + path=file.file_path, + io_config=io_config, + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, + ) + logger.info(f"debug_identify_columns:{identify_columns}") + df = df.select(daft.col(identify_columns[0]).hash()) + arrow_table = df.to_arrow() + return arrow_table diff --git a/deltacat/compute/converter/utils/__init__.py b/deltacat/compute/converter/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/utils/convert_task_options.py b/deltacat/compute/converter/utils/convert_task_options.py new file mode 100644 index 00000000..2d0f7532 --- /dev/null +++ b/deltacat/compute/converter/utils/convert_task_options.py @@ -0,0 +1,90 @@ +from typing import Optional, Dict +from deltacat.exceptions import RetryableError + +AVERAGE_FILE_PATH_COLUMN_SIZE_BYTES = 80 +AVERAGE_POS_COLUMN_SIZE_BYTES = 4 +XXHASH_BYTE_PER_RECORD = 8 +MEMORY_BUFFER_RATE = 1.2 + + +def estimate_fixed_hash_columns(hash_value_size_bytes_per_record, total_record_count): + return hash_value_size_bytes_per_record * total_record_count + + +def get_total_record_from_iceberg_files(iceberg_files_list): + total_record_count = 0 + for iceberg_files in iceberg_files_list: + total_record_count += sum(file.record_count for file in iceberg_files) + return total_record_count + + +def estimate_iceberg_pos_delete_additional_columns( + include_columns, num_of_record_count +): + total_additional_columns_sizes = 0 + if "file_path" in include_columns: + total_additional_columns_sizes += ( + AVERAGE_FILE_PATH_COLUMN_SIZE_BYTES * num_of_record_count + ) + elif "pos" in include_columns: + total_additional_columns_sizes += ( + AVERAGE_POS_COLUMN_SIZE_BYTES * num_of_record_count + ) + return total_additional_columns_sizes + + +def estimate_convert_remote_option_resources(data_files, equality_delete_files): + data_file_record_count = get_total_record_from_iceberg_files(data_files) + equality_delete_record_count = get_total_record_from_iceberg_files( + equality_delete_files + ) + hash_column_sizes = estimate_fixed_hash_columns( + XXHASH_BYTE_PER_RECORD, data_file_record_count + equality_delete_record_count + ) + pos_delete_sizes = estimate_iceberg_pos_delete_additional_columns( + ["file_path", "pos"], data_file_record_count + equality_delete_record_count + ) + total_memory_required = hash_column_sizes + pos_delete_sizes + return total_memory_required * MEMORY_BUFFER_RATE + + +def _get_task_options( + memory: float, + ray_custom_resources: Optional[Dict] = None, + scheduling_strategy: str = "SPREAD", +) -> Dict: + + # NOTE: With DEFAULT scheduling strategy in Ray 2.20.0, autoscaler does + # not spin up enough nodes fast and hence we see only approximately + # 20 tasks get scheduled out of 100 tasks in queue. Hence, we use SPREAD + # which is also ideal for merge and hash bucket tasks. + # https://docs.ray.io/en/latest/ray-core/scheduling/index.html + task_opts = { + "memory": memory, + "scheduling_strategy": scheduling_strategy, + } + + if ray_custom_resources: + task_opts["resources"] = ray_custom_resources + + task_opts["max_retries"] = 3 + + # List of possible botocore exceptions are available at + # https://github.com/boto/botocore/blob/develop/botocore/exceptions.py + task_opts["retry_exceptions"] = [RetryableError] + + print(f"estimated_memory:{memory}") + return task_opts + + +def convert_resource_options_provider(index, files_for_each_bucket): + print(f"option_files_for_each_bucket:{files_for_each_bucket}") + ( + data_files_list, + equality_delete_files_list, + position_delete_files_list, + ) = files_for_each_bucket[1] + memory_requirement = estimate_convert_remote_option_resources( + data_files_list, equality_delete_files_list + ) + return _get_task_options(memory=memory_requirement) diff --git a/deltacat/compute/converter/utils/converter_session_utils.py b/deltacat/compute/converter/utils/converter_session_utils.py new file mode 100644 index 00000000..3c5a9063 --- /dev/null +++ b/deltacat/compute/converter/utils/converter_session_utils.py @@ -0,0 +1,50 @@ +def check_data_files_sequence_number(data_files_list, equality_delete_files_list): + data_files_list.sort(key=lambda file_tuple: file_tuple[0]) + equality_delete_files_list.sort(key=lambda file_tuple: file_tuple[0]) + + equality_delete_files = [] + result_data_file = [] + + # Pointer for list data_file + data_file_pointer = 0 + + debug_equality_delete_files = [] + + # Loop through each value in equality_delete_file + for equality_file_tuple in equality_delete_files_list: + # Find all values in data_file that are smaller than val_equality + valid_values = [] + + debug_valid_values = [] + # Move data_file_pointer to the first value in data_file that is smaller than val_equality + while ( + data_file_pointer < len(data_files_list) + and data_files_list[data_file_pointer][0] < equality_file_tuple[0] + ): + valid_values.append(data_files_list[data_file_pointer][1]) + debug_valid_values.append(data_files_list[data_file_pointer]) + data_file_pointer += 1 + equality_delete_files.append(equality_file_tuple[1]) + debug_equality_delete_files.append(equality_file_tuple) + + # Append the value from equality_delete_file and the corresponding valid values from data_file + if valid_values: + result_data_file.append(valid_values) + + result_equality_delete_file = append_larger_sequence_number_data_files( + equality_delete_files + ) + + return result_equality_delete_file, result_data_file + + +def append_larger_sequence_number_data_files(data_files_list): + result = [] + # Iterate over the input list + for i in range(len(data_files_list)): + sublist = data_files_list[i:] + sublist_file_list = [] + for file in sublist: + sublist_file_list.append(file) + result.append(sublist_file_list) + return result diff --git a/deltacat/compute/converter/utils/iceberg_columns.py b/deltacat/compute/converter/utils/iceberg_columns.py new file mode 100644 index 00000000..2bd795bb --- /dev/null +++ b/deltacat/compute/converter/utils/iceberg_columns.py @@ -0,0 +1,38 @@ +import pyarrow as pa +from typing import Union + + +def _get_iceberg_col_name(suffix): + return f"{suffix}" + + +_ORDERED_RECORD_IDX_COLUMN_NAME = _get_iceberg_col_name("pos") +_ORDERED_RECORD_IDX_COLUMN_TYPE = pa.int64() +_ORDERED_RECORD_IDX_COLUMN_FIELD = pa.field( + _ORDERED_RECORD_IDX_COLUMN_NAME, + _ORDERED_RECORD_IDX_COLUMN_TYPE, +) + + +def get_record_index_column_array(obj) -> Union[pa.Array, pa.ChunkedArray]: + return pa.array( + obj, + _ORDERED_RECORD_IDX_COLUMN_TYPE, + ) + + +def append_record_idx_col(table: pa.Table, ordered_record_indices) -> pa.Table: + + table = table.append_column( + _ORDERED_RECORD_IDX_COLUMN_FIELD, + get_record_index_column_array(ordered_record_indices), + ) + return table + + +_FILE_PATH_COLUMN_NAME = _get_iceberg_col_name("file_path") +_FILE_PATH_COLUMN_TYPE = pa.string() +_FILE_PATH_COLUMN_FIELD = pa.field( + _FILE_PATH_COLUMN_NAME, + _FILE_PATH_COLUMN_TYPE, +) diff --git a/deltacat/compute/converter/utils/s3u.py b/deltacat/compute/converter/utils/s3u.py new file mode 100644 index 00000000..5fe9ef4a --- /dev/null +++ b/deltacat/compute/converter/utils/s3u.py @@ -0,0 +1,129 @@ +from tenacity import ( + Retrying, + retry_if_exception_type, + stop_after_delay, + wait_random_exponential, +) +from typing import Union +from deltacat.aws.s3u import CapturedBlockWritePaths, UuidBlockWritePathProvider +from deltacat.types.tables import ( + get_table_writer, + get_table_length, + TABLE_CLASS_TO_SLICER_FUNC, +) +from typing import Optional, Dict, Any, List +from deltacat.exceptions import RetryableError +from deltacat.storage import ( + DistributedDataset, + LocalTable, +) +from deltacat.types.media import ( + ContentEncoding, + ContentType, +) +from deltacat.aws.s3u import UPLOAD_SLICED_TABLE_RETRY_STOP_AFTER_DELAY +import s3fs + + +def get_credential(): + import boto3 + + boto3_session = boto3.Session() + credentials = boto3_session.get_credentials() + return credentials + + +def get_s3_file_system(content_type): + token_holder = get_credential() + content_encoding = ContentEncoding.IDENTITY + + s3_file_system = s3fs.S3FileSystem( + key=token_holder.access_key, + secret=token_holder.secret_key, + token=token_holder.token, + s3_additional_kwargs={ + "ServerSideEncryption": "aws:kms", + # TODO: Get tagging from table properties + "ContentType": content_type.value, + "ContentEncoding": content_encoding.value, + }, + ) + return s3_file_system + + +def upload_table_with_retry( + table: Union[LocalTable, DistributedDataset], + s3_url_prefix: str, + s3_table_writer_kwargs: Optional[Dict[str, Any]], + content_type: ContentType = ContentType.PARQUET, + max_records_per_file: Optional[int] = 4000000, + **s3_client_kwargs, +) -> List[str]: + """ + Writes the given table to 1 or more S3 files and return Redshift + manifest entries describing the uploaded files. + """ + retrying = Retrying( + wait=wait_random_exponential(multiplier=1, max=60), + stop=stop_after_delay(UPLOAD_SLICED_TABLE_RETRY_STOP_AFTER_DELAY), + retry=retry_if_exception_type(RetryableError), + ) + + if s3_table_writer_kwargs is None: + s3_table_writer_kwargs = {} + + s3_file_system = get_s3_file_system(content_type=content_type) + capture_object = CapturedBlockWritePaths() + block_write_path_provider = UuidBlockWritePathProvider(capture_object) + s3_table_writer_func = get_table_writer(table) + table_record_count = get_table_length(table) + if max_records_per_file is None or not table_record_count: + retrying( + fn=upload_table, + table_slices=table, + s3_base_url=f"{s3_url_prefix}", + s3_file_system=s3_file_system, + s3_table_writer_func=s3_table_writer_func, + s3_table_writer_kwargs=s3_table_writer_kwargs, + block_write_path_provider=block_write_path_provider, + content_type=content_type, + **s3_client_kwargs, + ) + else: + table_slicer_func = TABLE_CLASS_TO_SLICER_FUNC.get(type(table)) + table_slices = table_slicer_func(table, max_records_per_file) + for table_slice in table_slices: + retrying( + fn=upload_table, + table_slices=table_slice, + s3_base_url=f"{s3_url_prefix}", + s3_file_system=s3_file_system, + s3_table_writer_func=s3_table_writer_func, + s3_table_writer_kwargs=s3_table_writer_kwargs, + block_write_path_provider=block_write_path_provider, + content_type=content_type, + **s3_client_kwargs, + ) + del block_write_path_provider + write_paths = capture_object.write_paths() + return write_paths + + +def upload_table( + table_slices, + s3_base_url, + s3_file_system, + s3_table_writer_func, + block_write_path_provider, + content_type, + s3_table_writer_kwargs, +): + s3_table_writer_func( + table_slices, + s3_base_url, + s3_file_system, + block_write_path_provider, + content_type.value, + **s3_table_writer_kwargs, + ) + # TODO: Add a proper fix for block_refs and write_paths not persisting in Ray actors diff --git a/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py b/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py deleted file mode 100644 index 69d17aee..00000000 --- a/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py +++ /dev/null @@ -1,226 +0,0 @@ -from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties -from typing import Union, Optional -import invoke_parallel -import ray - - -def convert_equality_deletes_to_position_deletes( - source_table_identifier: Union[str, Identifier], - destination_table_identifier: Union[str, Identifier], - read_branch, - write_branch, - snapshot_id, - iceberg_catalog_name, - iceberg_catalog_properties, - iceberg_spec_format_version, - memory_resource_planner, - **kwargs): - """ - Convert equality delete to position delete. - Compute and memory heavy work from downloading equality delete table and compute position deletes - will be executed on Ray remote tasks. - """ - # Run on head node - catalog = load_catalog(iceberg_catalog_name=iceberg_catalog_name, - iceberg_catalog_properties=iceberg_catalog_properties, - ) - iceberg_table = fetch_table(catalog, source_table_identifier) - num_of_buckets = get_num_of_bucket(iceberg_table) - - from deltacat.equality_delete_to_position_delete_converter.overrides import fetch_all_bucket_files - data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(table, num_of_buckets) - - convert_options_provider = functools.partial( - task_resource_options_provider, - pg_config=params.pg_config, - resource_amount_provider=memory_resource_planner, - equality_delete_file_meta=equality_delete_dict[bucket_index], - primary_keys=params.primary_keys, - deltacat_storage=params.deltacat_storage, - deltacat_storage_kwargs=params.deltacat_storage_kwargs, - ray_custom_resources=params.ray_custom_resources, - memory_logs_enabled=params.memory_logs_enabled, - estimate_resources_params=params.estimate_resources_params, - ) - - def convert_input_provider(index, item) -> dict[str, MergeInput]: - return { - "input": ConvertInput.of( - bucket_index=bucket_index, - equality_delete_file=equality_delete_dict[index], - position_delete_file=position_delete_dict[index], - data_file_dict=data_file_dict[index], - primary_keys=params.primary_keys, - sort_keys=params.sort_keys, - compact_small_files=compact_small_files, - # copied from compactor v2 - enable_profiler=params.enable_profiler, - metrics_config=params.metrics_config, - s3_table_writer_kwargs=params.s3_table_writer_kwargs, - read_kwargs_provider=params.read_kwargs_provider, - deltacat_storage=params.deltacat_storage, - deltacat_storage_kwargs=params.deltacat_storage_kwargs, - ) - } - # Ray remote task: convert - # Assuming that memory consume by each bucket doesn't exceed one node's memory limit. - # TODO: Add split mechanism to split large buckets - convert_tasks_pending = invoke_parallel( - items=all_files_for_bucket.items(), - ray_task=convert, - max_parallelism=task_max_parallelism, - options_provider=convert_options_provider, - kwargs_provider=convert_input_provider, - ) - - materialized_s3_links = ray.get(convert_tasks_pending) - - from deltacat.equality_delete_to_position_delete_converter.overrides import replace - - with table.transaction() as tx: - snapshot_properties = {} - commit_uuid = uuid.uuid4() - with tx.update_snapshot(snapshot_properties=snapshot_properties).replace( - commit_uuid=commit_uuid, using_starting_sequence=True - ) as replace_snapshot: - replace_snapshot.append_data_file(materialized_s3_links) - replace_snapshot.delete_data_file(equality_delete_files) - replace_snapshot._commit() - - -@ray.remote -def convert(bucket_index, - equality_delete_file, - position_delete_file, - data_file_dict, - primary_keys, - sort_keys, - compact_small_files): - - # Download ONLY equality delete table primary keys and store in Ray Plasma store - equality_delete_table = download_bucketed_table(equality_delete_files=equality_delete_files, - columns=primary_key_columns) - - equality_delete_table_ref = ray.put(equality_delete_table) - - def pos_compute_options_provider(memory_resource_planner): - return (memory_resource_planner(data_file_primary_key_memory + previous_pos_delete_file_primary_key_memory + - file_path_column_memory + pos_column_memory)) - - - file_level_compute_tasks_pending_ref = invoke_parallel( - items=data_file_dict.items(), - ray_task=compute_pos_delete_file_level, - - max_parallelism=task_max_parallelism, - options_provider=pos_compute_options_provider, - kwargs_provider=pos_compute_input_provider, - ) - - materialzed_pos_delete_file_link = ray.get(file_level_compute_tasks_pending_ref) - - # For query performance purpose, setting lower_bound == upper bound will help filter out positional delete files, - # Code link: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L52 - set_lower_upper_bound_positional_delete_metadata(positional_delete_files) - - return materialzed_pos_delete_file_link - -@ray.remote -def compute_pos_delete_file_level(equality_delete_table_ref, data_file, previous_pos_delete_files): - # fetch equality delete table from ray plasma store - equality_delete_table = ray.get(equality_delete_table_ref) - - data_file_table = download_bucketed_table(files=data_file, - columns=primary_key_columns) - - if previous_pos_delete_files: - previous_pos_delete_table = download_bucketed_table(files=previous_pos_delete_file, - columns=primary_key_column) - - data_file_table = append_record_idx(data_file_table) - new_position_delete_table = filter_rows_to_be_deleted(data_file_table, equality_delete_table) - pos_delete_table = pa.concat_tables([previous_pos_delete_table, new_position_delete_table]) - pos_delete_table= append_data_file_path(pos_delete_table) - - # Write to S3, get S3 link back - new_positional_delete_file = materialize_positional_delete_table_to_s3(pos_delete_table) - return new_positional_delete_file - -def fetch_table(iceberg_catalog, table_identifer): - iceberg_table = iceberg_catalog.load_table(table_identifer) - return iceberg_table - - -def get_num_of_bucket(table) -> int: - from pyiceberg.transforms import BucketTransform - for partition_field in table.metadata.partition_specs[0].fields: - if isinstance(partition_field.transform, BucketTransform): - return partition_field.transform.num_buckets - - -def load_catalog(iceberg_catalog_name, iceberg_catalog_properties): - catalog = load_catalog( - name=iceberg_catalog_name, - **iceberg_catalog_properties, - ) - return catalog - - -def materialize_positional_delete_table_to_s3(positional_delete_table): - from deltacat.aws.s3u import upload_sliced_table - upload_sliced_table(table=positional_delete_table) - - -def download_bucketed_table(data_files, equality_delete_files): - from deltacat.utils.pyarrow import s3_file_to_table - compacted_table = s3_file_to_table(data_files) - equality_delete_table = s3_file_to_table(equality_delete_files) - return compacted_table, equality_delete_table - - -def append_record_idx(compacted_table): - return compacted_table - - -def filter_rows_to_be_deleted(compacted_table, equality_delete_table): - if compacted_table: - equality_delete_table = pc.is_in( - compacted_table[sc._PK_HASH_STRING_COLUMN_NAME], - incremental_table[sc._PK_HASH_STRING_COLUMN_NAME], - ) - - positional_delete_table = compacted_table.filter(equality_delete_table) - logger.info(f"POSITIONAL_DELETE_TABLE_LENGTH:{len(positional_delete_table)}") - - # incremental_table = _drop_delta_type_rows(incremental_table, DeltaType.DELETE) - # result_table_list.append(incremental_table) - final_table = None - if positional_delete_table: - final_table = final_table.drop([sc._PK_HASH_STRING_COLUMN_NAME]) - final_table = final_table.drop([sc._ORDERED_RECORD_IDX_COLUMN_NAME]) - final_table = final_table.drop([sc._ORDERED_FILE_IDX_COLUMN_NAME]) - - return final_table - - -def append_data_file_path(): - """ - Util function to construct positional delete files, positional delete files contains two columns: data file path, position - """ - pass - - -def set_lower_upper_bound_positional_delete_metadata(): - """ - Util function to set lower bound and upper bound of positional delete file metadata - """ - pass - - -def load_table_files(table, snapshot_id, bucket_index): - """ - load files based on bucket index, utilze hidden partitioning to filter out irrelevant files. - """ - # TODO: Performance testing using Daft scan task for plan partitioned files and estimated memory - row_filter = f"bucket={bucket_index}" - iceberg_tasks = table.scan(row_filter=row_filter, snapshot_id=snapshot_id).plan_files() diff --git a/deltacat/compute/equality_delete_to_position_delete_conveter/overrides.py b/deltacat/compute/equality_delete_to_position_delete_conveter/overrides.py deleted file mode 100644 index abfe297a..00000000 --- a/deltacat/compute/equality_delete_to_position_delete_conveter/overrides.py +++ /dev/null @@ -1,201 +0,0 @@ -def fetch_all_bucket_files(table, number_of_buckets): - # step 1: filter manifests using partition summaries - # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - from pyiceberg.typedef import ( - EMPTY_DICT, - IcebergBaseModel, - IcebergRootModel, - Identifier, - KeyDefaultDict, - ) - - data_scan = table.scan() - snapshot = data_scan.snapshot() - if not snapshot: - return iter([]) - manifest_evaluators = KeyDefaultDict(data_scan._build_manifest_evaluator) - - manifests = [ - manifest_file - for manifest_file in snapshot.manifests(data_scan.io) - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) - ] - - # step 2: filter the data files in each manifest - # this filter depends on the partition spec used to write the manifest file - from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator - from pyiceberg.types import ( - strtobool, - ) - from pyiceberg.table import _min_sequence_number, _open_manifest - from pyiceberg.utils.concurrent import ExecutorFactory - from itertools import chain - from pyiceberg.manifest import DataFileContent - - partition_evaluators = KeyDefaultDict(data_scan._build_partition_evaluator) - metrics_evaluator = _InclusiveMetricsEvaluator( - data_scan.table_metadata.schema(), - data_scan.row_filter, - data_scan.case_sensitive, - strtobool(data_scan.options.get("include_empty_files", "false")), - ).eval - - min_sequence_number = _min_sequence_number(manifests) - - # {"bucket_index": List[DataFile]} - data_entries = defaultdict() - equality_data_entries = defaultdict() - positional_delete_entries = defaultdict() - - executor = ExecutorFactory.get_or_create() - for manifest_entry in chain( - *executor.map( - lambda args: _open_manifest(*args), - [ - ( - data_scan.io, - manifest, - partition_evaluators[manifest.partition_spec_id], - metrics_evaluator, - ) - for manifest in manifests - if data_scan._check_sequence_number(min_sequence_number, manifest) - ], - ) - ): - data_file = manifest_entry.data_file - if data_file.content == DataFileContent.DATA: - data_entries.append(manifest_entry) - if data_file.content == DataFileContent.POSITION_DELETES: - positional_delete_entries.add(manifest_entry) - elif data_file.content == DataFileContent.EQUALITY_DELETES: - equality_data_entries.append(data_file) - else: - logger.warning(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") - return data_entries, equality_data_entries, positional_delete_entries - -def replace(self, commit_uuid: Optional[uuid.UUID] = None, using_starting_sequence: Optional[bool] = False) -> _ReplaceFiles: - print(f"tansaction_current_snapshot:{self._transaction.table_metadata.current_snapshot()}") - return _ReplaceFiles( - commit_uuid=commit_uuid, - operation=Operation.REPLACE - if self._transaction.table_metadata.current_snapshot() is not None - else Operation.APPEND, - transaction=self._transaction, - io=self._io, - snapshot_properties=self._snapshot_properties, - using_starting_sequence=using_starting_sequence, - ) - -# Overrides SnapshotProducer to allow using deleted files' sequence number -def _manifests(self) -> List[ManifestFile]: - def _write_added_manifest() -> List[ManifestFile]: - replace_manifest_using_starting_sequence = None - if self.using_starting_sequence and self._operation == Operation.REPLACE and self._deleted_entries: - snapshot = self._transaction.table_metadata.current_snapshot() - for manifest_file in snapshot.manifests(io=self._io): - entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=False) - relevant_manifests = [entry for entry in entries if entry.data_file in self._deleted_data_files] - replace_manifest_using_starting_sequence = min( - entry.sequence_number for entry in relevant_manifests) - if self._added_data_files: - with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.spec(), - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - ) as writer: - for data_file in self._added_data_files: - writer.add( - ManifestEntry( - status=ManifestEntryStatus.ADDED, - snapshot_id=self._snapshot_id, - sequence_number=replace_manifest_using_starting_sequence if replace_manifest_using_starting_sequence else None, - file_sequence_number=None, - data_file=data_file, - ) - ) - return [writer.to_manifest_file()] - else: - return [] - - -class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]): - """Overwrites data from the table. This will produce an OVERWRITE snapshot. - - Data and delete files were added and removed in a logical overwrite operation. - """ - - def _existing_manifests(self) -> List[ManifestFile]: - """Determine if there are any existing manifest files.""" - existing_files = [] - - if snapshot := self._transaction.table_metadata.current_snapshot(): - for manifest_file in snapshot.manifests(io=self._io): - entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) - - found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] - - if len(found_deleted_data_files) == 0: - existing_files.append(manifest_file) - else: - # We have to replace the manifest file without the deleted data files - if any(entry.data_file not in found_deleted_data_files for entry in entries): - with write_manifest( - format_version=self._transaction.table_metadata.format_version, - spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], - schema=self._transaction.table_metadata.schema(), - output_file=self.new_manifest_output(), - snapshot_id=self._snapshot_id, - ) as writer: - [ - writer.add_entry( - ManifestEntry( - status=ManifestEntryStatus.EXISTING, - snapshot_id=entry.snapshot_id, - sequence_number=entry.sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, - ) - ) - for entry in entries - if entry.data_file not in found_deleted_data_files - ] - existing_files.append(writer.to_manifest_file()) - return existing_files - - def _deleted_entries(self) -> List[ManifestEntry]: - """To determine if we need to record any deleted entries. - - With a full overwrite all the entries are considered deleted. - With partial overwrites we have to use the predicate to evaluate - which entries are affected. - """ - if self._parent_snapshot_id is not None: - previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) - if previous_snapshot is None: - # This should never happen since you cannot overwrite an empty table - raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") - - executor = ExecutorFactory.get_or_create() - - def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: - return [ - ManifestEntry( - status=ManifestEntryStatus.DELETED, - snapshot_id=entry.snapshot_id, - sequence_number=entry.sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, - ) - for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) - if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files - ] - - list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) - return list(itertools.chain(*list_of_entries)) - else: - return [] - - diff --git a/deltacat/utils/ray_utils/concurrency.py b/deltacat/utils/ray_utils/concurrency.py index f609f289..03566cae 100644 --- a/deltacat/utils/ray_utils/concurrency.py +++ b/deltacat/utils/ray_utils/concurrency.py @@ -58,6 +58,8 @@ def invoke_parallel( opt = {} if options_provider: + print(f"options_provider_I:{i}") + print(f"options_provider_item:{item}") opt = options_provider(i, item) if not kwargs_provider: pending_id = ray_task.options(**opt).remote(item, *args, **kwargs) diff --git a/requirements.txt b/requirements.txt index 49db9523..3d1e8d7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,11 +6,12 @@ getdaft==0.3.6 numpy == 1.21.5 pandas == 1.3.5 pyarrow == 12.0.1 -pydantic == 1.10.4 +pydantic == 2.5.0 +pyiceberg == 0.7.1 pymemcache == 4.0.0 ray >= 2.20.0,<2.31.0 redis == 4.6.0 s3fs == 2024.5.0 schedule == 1.2.0 -tenacity == 8.1.0 -typing-extensions == 4.4.0 +tenacity == 8.5.0 +typing-extensions == 4.12.2 diff --git a/setup.py b/setup.py index 7226013c..a8c7c127 100644 --- a/setup.py +++ b/setup.py @@ -40,15 +40,16 @@ def find_version(*paths): "numpy == 1.21.5", "pandas == 1.3.5", "pyarrow == 12.0.1", - "pydantic == 1.10.4", + "pydantic == 2.5.0", "ray >= 2.20.0", "s3fs == 2024.5.0", - "tenacity == 8.1.0", - "typing-extensions == 4.4.0", + "tenacity == 8.5.0", + "typing-extensions == 4.12.2", "pymemcache == 4.0.0", "redis == 4.6.0", "getdaft == 0.3.6", "schedule == 1.2.0", + "pyiceberg == 0.7.1", ], setup_requires=["wheel"], package_data={