From 9d15ee526f25971c496c0c6db83f4ebd70d51343 Mon Sep 17 00:00:00 2001 From: Zyiqin-Miranda <79943187+Zyiqin-Miranda@users.noreply.github.com> Date: Sun, 26 Jan 2025 20:56:04 -0800 Subject: [PATCH] [Tests passing] [2.0] Add initial eq-to-pos delete job (#356) * [WIP] Add eq-to-pos delete job session draft * Update with producing file level pos deletes * Resolve dependency conflicts with 2.0 branch * Add more documentation to example; code cleanup * Bump linter version to fix linter + reformatting --- deltacat/compute/converter/__init__.py | 0 .../example_single_merge_key_converter.py | 554 ++++++++++++++++++ deltacat/compute/converter/constants.py | 4 + .../compute/converter/converter_session.py | 143 +++++ deltacat/compute/converter/model/__init__.py | 0 .../compute/converter/model/convert_input.py | 55 ++ .../model/converter_session_params.py | 81 +++ .../compute/converter/pyiceberg/__init__.py | 0 .../compute/converter/pyiceberg/catalog.py | 75 +++ .../compute/converter/pyiceberg/overrides.py | 134 +++++ .../converter/pyiceberg/replace_snapshot.py | 174 ++++++ deltacat/compute/converter/steps/__init__.py | 0 deltacat/compute/converter/steps/convert.py | 188 ++++++ deltacat/compute/converter/utils/__init__.py | 0 .../converter/utils/convert_task_options.py | 90 +++ .../utils/converter_session_utils.py | 50 ++ .../converter/utils/iceberg_columns.py | 38 ++ deltacat/compute/converter/utils/s3u.py | 129 ++++ .../writer/test_memtable_dataset_writer.py | 22 +- 19 files changed, 1729 insertions(+), 8 deletions(-) create mode 100644 deltacat/compute/converter/__init__.py create mode 100644 deltacat/compute/converter/_dev/example_single_merge_key_converter.py create mode 100644 deltacat/compute/converter/constants.py create mode 100644 deltacat/compute/converter/converter_session.py create mode 100644 deltacat/compute/converter/model/__init__.py create mode 100644 deltacat/compute/converter/model/convert_input.py create mode 100644 deltacat/compute/converter/model/converter_session_params.py create mode 100644 deltacat/compute/converter/pyiceberg/__init__.py create mode 100644 deltacat/compute/converter/pyiceberg/catalog.py create mode 100644 deltacat/compute/converter/pyiceberg/overrides.py create mode 100644 deltacat/compute/converter/pyiceberg/replace_snapshot.py create mode 100644 deltacat/compute/converter/steps/__init__.py create mode 100644 deltacat/compute/converter/steps/convert.py create mode 100644 deltacat/compute/converter/utils/__init__.py create mode 100644 deltacat/compute/converter/utils/convert_task_options.py create mode 100644 deltacat/compute/converter/utils/converter_session_utils.py create mode 100644 deltacat/compute/converter/utils/iceberg_columns.py create mode 100644 deltacat/compute/converter/utils/s3u.py diff --git a/deltacat/compute/converter/__init__.py b/deltacat/compute/converter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/_dev/example_single_merge_key_converter.py b/deltacat/compute/converter/_dev/example_single_merge_key_converter.py new file mode 100644 index 00000000..27bae9a4 --- /dev/null +++ b/deltacat/compute/converter/_dev/example_single_merge_key_converter.py @@ -0,0 +1,554 @@ +import pyarrow as pa +import uuid +import boto3 +import ray + +from typing import Optional +from deltacat.compute.converter.converter_session import ( + converter_session, +) +from deltacat.compute.converter.model.converter_session_params import ( + ConverterSessionParams, +) + + +def get_s3_path( + bucket_name: str, + database_name: Optional[str] = None, + table_name: Optional[str] = None, +) -> str: + result_path = f"s3://{bucket_name}" + if database_name is not None: + result_path += f"/{database_name}.db" + + if table_name is not None: + result_path += f"/{table_name}" + return result_path + + +def get_bucket_name(): + return "metadata-py4j-zyiqin1" + + +def get_credential(): + boto3_session = boto3.Session() + credentials = boto3_session.get_credentials() + return credentials + + +def get_glue_catalog(): + from pyiceberg.catalog import load_catalog + + credential = get_credential() + access_key_id = credential.access_key + secret_access_key = credential.secret_key + session_token = credential.token + s3_path = get_s3_path(get_bucket_name()) + glue_catalog = load_catalog( + "glue", + **{ + "warehouse": s3_path, + "type": "glue", + "aws_access_key_id": access_key_id, + "aws_secret_access_key": secret_access_key, + "aws_session_token": session_token, + "region_name": "us-east-1", + "s3.access-key-id": access_key_id, + "s3.secret-access-key": secret_access_key, + "s3.session-token": session_token, + "s3.region": "us-east-1", + }, + ) + + return glue_catalog + + +def get_table_schema(): + from pyiceberg.schema import Schema + from pyiceberg.types import ( + NestedField, + StringType, + LongType, + ) + + return Schema( + NestedField( + field_id=1, name="partitionkey", field_type=StringType(), required=False + ), + NestedField(field_id=2, name="bucket", field_type=LongType(), required=False), + NestedField( + field_id=3, name="primarykey", field_type=StringType(), required=False + ), + NestedField( + field_id=2147483546, + name="file_path", + field_type=StringType(), + required=False, + ), + NestedField( + field_id=2147483545, name="pos", field_type=LongType(), require=False + ), + schema_id=1, + ) + + +def get_partition_spec(): + from pyiceberg.partitioning import PartitionSpec, PartitionField + from pyiceberg.transforms import IdentityTransform + + partition_field_identity = PartitionField( + source_id=1, field_id=101, transform=IdentityTransform(), name="partitionkey" + ) + partition_spec = PartitionSpec(partition_field_identity) + return partition_spec + + +def create_table_with_data_files_and_equality_deletes(table_version): + glue_catalog = get_glue_catalog() + schema = get_table_schema() + ps = get_partition_spec() + + properties = dict() + properties["write.format.default"] = "parquet" + properties["write.delete.mode"] = "merge-on-read" + properties["write.update.mode"] = "merge-on-read" + properties["write.merge.mode"] = "merge-on-read" + properties["format-version"] = "2" + glue_catalog.create_table( + f"testio.example_{table_version}_partitioned", + schema=schema, + partition_spec=ps, + properties=properties, + ) + + +def load_table(table_version): + glue_catalog = get_glue_catalog() + loaded_table = glue_catalog.load_table( + f"testio.example_{table_version}_partitioned" + ) + return loaded_table + + +def get_s3_file_system(): + import pyarrow + + credential = get_credential() + access_key_id = credential.access_key + secret_access_key = credential.secret_key + session_token = credential.token + return pyarrow.fs.S3FileSystem( + access_key=access_key_id, + secret_key=secret_access_key, + session_token=session_token, + ) + + +def write_data_table( + tmp_path: str, batch_number, number_of_records, partition_value +) -> str: + import pyarrow.parquet as pq + + uuid_path = uuid.uuid4() + deletes_file_path = f"{tmp_path}/data_{uuid_path}.parquet" + table = generate_test_pyarrow_table( + batch_number=batch_number, + number_of_records=number_of_records, + partition_value=partition_value, + ) + file_system = get_s3_file_system() + pq.write_table(table, deletes_file_path, filesystem=file_system) + return build_delete_data_file(f"s3://{deletes_file_path}") + + +def build_delete_data_file(file_path): + print(f"build_delete_file_path:{file_path}") + return file_path + + +def commit_data_to_table(table, batch_number, number_of_records, partition_value): + delete_s3_url = "metadata-py4j-zyiqin1" + data_files = [ + write_data_table( + delete_s3_url, batch_number, number_of_records, partition_value + ) + ] + add_data_files(file_paths=data_files) + return data_files + + +def commit_equality_delete_to_table( + table, to_be_deleted_batch_number, partition_value, number_of_records +): + delete_s3_url = "metadata-py4j-zyiqin1" + data_files = [ + write_equality_data_table( + delete_s3_url=delete_s3_url, + to_be_deleted_batch_number=to_be_deleted_batch_number, + partition_value=partition_value, + number_of_records=number_of_records, + ) + ] + add_equality_data_files(file_paths=data_files) + return data_files + + +def write_equality_data_table( + delete_s3_url, to_be_deleted_batch_number, partition_value, number_of_records +): + import pyarrow.parquet as pq + + uuid_path = uuid.uuid4() + deletes_file_path = f"{delete_s3_url}/equality_delete_{uuid_path}.parquet" + table = generate_test_pyarrow_table( + batch_number=to_be_deleted_batch_number, + partition_value=partition_value, + number_of_records=number_of_records, + ) + file_system = get_s3_file_system() + pq.write_table(table, deletes_file_path, filesystem=file_system) + return build_delete_data_file(f"s3://{deletes_file_path}") + + +def generate_test_pyarrow_table(batch_number, number_of_records, partition_value): + primary_keys_iterables = range(1, number_of_records + 1, 1) + primary_keys = list( + f"pk_sequence{batch_number}_value{str(index)}" + for index in primary_keys_iterables + ) + print(f"primary_keys:{primary_keys}") + test_table = pa.table( + { + "partitionkey": [partition_value] * number_of_records, + "primarykey": primary_keys, + "bucket": [1] * number_of_records, + } + ) + return test_table + + +# commit to s3 +def parquet_files_to_positional_delete_files(io, table_metadata, file_paths): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.POSITION_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(partitionkey="111"), + # partition=Record(**{"pk": "111", "bucket": 2}), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + yield data_file + + +def produce_pos_delete_file(io, table_metadata, file_path): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible(schema, parquet_metadata.schema.to_arrow_schema()) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.POSITION_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(partitionkey="111"), + # partition=Record(**{"pk": "111", "bucket": 2}), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + return data_file + + +def parquet_files_to_data_files(io, table_metadata, file_paths): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + # pv = Record(**{"pk": "222", "bucket": 1}) + pv = Record(**{"partitionkey": "111"}) + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=pv, + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + yield data_file + + +def add_delete_files(file_paths): + table = load_table(TABLE_VERSION) + + with table.transaction() as tx: + if table.metadata.name_mapping() is None: + table.set_properties( + **{ + "schema.name-mapping.default": table.table_metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = parquet_files_to_positional_delete_files( + table_metadata=table.metadata, file_paths=file_paths, io=table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + + +def add_data_files(file_paths): + table = load_table(TABLE_VERSION) + # table.refresh() + with table.transaction() as tx: + if table.metadata.name_mapping() is None: + tx.set_properties( + **{ + "schema.name-mapping.default": table.metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = parquet_files_to_data_files( + table_metadata=table.metadata, file_paths=file_paths, io=table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + + +def add_equality_data_files(file_paths): + table = load_table(TABLE_VERSION) + with table.transaction() as tx: + if table.metadata.name_mapping() is None: + tx.set_properties( + **{ + "schema.name-mapping.default": table.metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = parquet_files_to_equality_data_files( + table_metadata=table.metadata, file_paths=file_paths, io=table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + + +def parquet_files_to_equality_data_files(io, table_metadata, file_paths): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + import pyarrow.parquet as pq + from pyiceberg.typedef import Record + + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=DataFileContent.EQUALITY_DELETES, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(partitionkey="111"), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + yield data_file + + +def scan_table(table): + print(f"scan_table result:{table.scan().to_arrow().to_pydict()}") + + +def initialize_ray(): + if not ray.is_initialized(): + ray.init(local_mode=True, ignore_reinit_error=True) + + +# --------------------------------------------------------------------------- +# README: Temporary example to create a new Iceberg table using your AWS account. +# Use Pyiceberg + GLUE catalog to construct data files and equality delete files +# ADA assume the admin access role `IibsAdminAccess-DO-NOT-DELETE` to give access first. +# Calls DeltaCAT compute/converter_session.py to convert generated equality deletes to position deletes +# Position deletes can be read correctly through Pyiceberg pyarrow table scan. +# --------------------------------------------------------------------------- + +initialize_ray() +# Test with creating a new iceberg table, bump your version here: +TABLE_VERSION = "39" +iceberg_table_name = f"testio.example_{TABLE_VERSION}_partitioned" +create_table_with_data_files_and_equality_deletes(TABLE_VERSION) +table = load_table(TABLE_VERSION) + +# Using batch_number to simulate the snapshot sequence +# 1. commit equality delete batch 1, which shouldn't take into any effect as no data files is committed yet +batch_number_1 = 1 +data_file_paths = commit_equality_delete_to_table( + table, + to_be_deleted_batch_number=batch_number_1, + partition_value="1", + number_of_records=1, +) + +# 2. commit 3 records for data table batch 2 +batch_number_2 = 2 +commit_data_to_table( + table, batch_number=batch_number_2, partition_value="1", number_of_records=3 +) + +# 3. Commit 1 equality delete record for data table 2 +batch_number_3 = 3 +commit_equality_delete_to_table( + table, + to_be_deleted_batch_number=batch_number_2, + partition_value="1", + number_of_records=1, +) + +# 4. commit 3 records for data table batch 4 +batch_number_4 = 4 +commit_data_to_table( + table, batch_number=batch_number_4, partition_value="1", number_of_records=3 +) + +# 5. Commit 1 equality delete record for data table batch 4 +batch_number_5 = 5 +commit_equality_delete_to_table( + table, + to_be_deleted_batch_number=batch_number_4, + partition_value="1", + number_of_records=1, +) + +# 6. Commit 3 records for data table batch 6 +batch_number_6 = 6 +commit_data_to_table( + table, batch_number=batch_number_6, partition_value="1", number_of_records=3 +) + +# Result: +# Two pos delete record should be committed as the final result. + + +# Calls compute/converter here. +glue_catalog = get_glue_catalog() +converter_session_params = ConverterSessionParams.of( + { + "catalog": glue_catalog, + "iceberg_table_name": iceberg_table_name, + "iceberg_warehouse_bucket_name": get_s3_path(get_bucket_name()), + "merge_keys": ["primarykey"], + } +) + +converter_session(params=converter_session_params) diff --git a/deltacat/compute/converter/constants.py b/deltacat/compute/converter/constants.py new file mode 100644 index 00000000..77e4db2b --- /dev/null +++ b/deltacat/compute/converter/constants.py @@ -0,0 +1,4 @@ +DEFAULT_CONVERTER_TASK_MAX_PARALLELISM = 4096 + +# Safe limit ONLY considering CPU limit, typically 32 for a 8x-large worker +DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD = 30 diff --git a/deltacat/compute/converter/converter_session.py b/deltacat/compute/converter/converter_session.py new file mode 100644 index 00000000..c5f1b5ac --- /dev/null +++ b/deltacat/compute/converter/converter_session.py @@ -0,0 +1,143 @@ +# from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from deltacat.utils.ray_utils.concurrency import ( + invoke_parallel, + task_resource_options_provider, +) +import ray +import functools +from deltacat.compute.converter.utils.convert_task_options import ( + convert_resource_options_provider, +) +import logging +from deltacat import logs +from collections import defaultdict +from deltacat.compute.converter.model.converter_session_params import ( + ConverterSessionParams, +) +from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD +from deltacat.compute.converter.steps.convert import convert +from deltacat.compute.converter.model.convert_input import ConvertInput +from deltacat.compute.converter.pyiceberg.overrides import ( + fetch_all_bucket_files, + parquet_files_dict_to_iceberg_data_files, +) +from deltacat.compute.converter.utils.converter_session_utils import ( + check_data_files_sequence_number, +) +from deltacat.compute.converter.pyiceberg.replace_snapshot import ( + commit_overwrite_snapshot, +) +from deltacat.compute.converter.pyiceberg.catalog import load_table + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def converter_session(params: ConverterSessionParams, **kwargs): + """ + Convert equality delete to position delete. + Compute and memory heavy work from downloading equality delete table and compute position deletes + will be executed on Ray remote tasks. + """ + + catalog = params.catalog + table_name = params.iceberg_table_name + iceberg_table = load_table(catalog, table_name) + data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files( + iceberg_table + ) + + # files_for_each_bucket contains the following files list: + # {partition_value: [(equality_delete_files_list, data_files_list, pos_delete_files_list)] + files_for_each_bucket = defaultdict(tuple) + for k, v in data_file_dict.items(): + logger.info(f"data_file: k, v:{k, v}") + for k, v in equality_delete_dict.items(): + logger.info(f"equality_delete_file: k, v:{k, v}") + for partition_value, equality_delete_file_list in equality_delete_dict.items(): + ( + result_equality_delete_file, + result_data_file, + ) = check_data_files_sequence_number( + data_files_list=data_file_dict[partition_value], + equality_delete_files_list=equality_delete_dict[partition_value], + ) + logger.info(f"result_data_file:{result_data_file}") + logger.info(f"result_equality_delete_file:{result_equality_delete_file}") + files_for_each_bucket[partition_value] = ( + result_data_file, + result_equality_delete_file, + [], + ) + + iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name + print(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}") + merge_keys = params.merge_keys + # Using table identifier fields as merge keys if merge keys not provided + if not merge_keys: + identifier_fields_set = iceberg_table.schema().identifier_field_names() + identifier_fields = list(identifier_fields_set) + else: + identifier_fields = merge_keys + if len(identifier_fields) > 1: + raise NotImplementedError( + f"Multiple identifier fields lookup not supported yet." + ) + convert_options_provider = functools.partial( + task_resource_options_provider, + resource_amount_provider=convert_resource_options_provider, + ) + + # TODO (zyiqin): max_parallel_data_file_download should be determined by memory requirement for each bucket. + # Specifically, for case when files for one bucket memory requirement exceed one worker node's memory limit, WITHOUT rebasing with larger hash bucket count, + # 1. We can control parallel files to download by adjusting max_parallel_data_file_download. + # 2. Implement two-layer converter tasks, with convert tasks to spin up child convert tasks. + # Note that approach 2 will ideally require shared object store to avoid download equality delete files * number of child tasks times. + max_parallel_data_file_download = DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD + + compact_small_files = params.compact_small_files + position_delete_for_multiple_data_files = ( + params.position_delete_for_multiple_data_files + ) + task_max_parallelism = params.task_max_parallelism + + def convert_input_provider(index, item): + return { + "convert_input": ConvertInput.of( + files_for_each_bucket=item, + convert_task_index=index, + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + identifier_fields=identifier_fields, + compact_small_files=compact_small_files, + position_delete_for_multiple_data_files=position_delete_for_multiple_data_files, + max_parallel_data_file_download=max_parallel_data_file_download, + ) + } + + # Ray remote task: convert + # Assuming that memory consume by each bucket doesn't exceed one node's memory limit. + # TODO: Add split mechanism to split large buckets + convert_tasks_pending = invoke_parallel( + items=files_for_each_bucket.items(), + ray_task=convert, + max_parallelism=task_max_parallelism, + options_provider=convert_options_provider, + kwargs_provider=convert_input_provider, + ) + to_be_deleted_files_list = [] + to_be_added_files_dict_list = [] + convert_results = ray.get(convert_tasks_pending) + for convert_result in convert_results: + to_be_deleted_files_list.extend(convert_result[0].values()) + to_be_added_files_dict_list.append(convert_result[1]) + + new_position_delete_files = parquet_files_dict_to_iceberg_data_files( + io=iceberg_table.io, + table_metadata=iceberg_table.metadata, + files_dict_list=to_be_added_files_dict_list, + ) + commit_overwrite_snapshot( + iceberg_table=iceberg_table, + # equality_delete_files + data file that all rows are deleted + to_be_deleted_files_list=to_be_deleted_files_list[0], + new_position_delete_files=new_position_delete_files, + ) diff --git a/deltacat/compute/converter/model/__init__.py b/deltacat/compute/converter/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/model/convert_input.py b/deltacat/compute/converter/model/convert_input.py new file mode 100644 index 00000000..556cb2f6 --- /dev/null +++ b/deltacat/compute/converter/model/convert_input.py @@ -0,0 +1,55 @@ +from __future__ import annotations +from typing import Dict, List + + +class ConvertInput(Dict): + @staticmethod + def of( + files_for_each_bucket, + convert_task_index, + iceberg_warehouse_bucket_name, + identifier_fields, + compact_small_files, + position_delete_for_multiple_data_files, + max_parallel_data_file_download, + ) -> ConvertInput: + + result = ConvertInput() + result["files_for_each_bucket"] = files_for_each_bucket + result["convert_task_index"] = convert_task_index + result["identifier_fields"] = identifier_fields + result["iceberg_warehouse_bucket_name"] = iceberg_warehouse_bucket_name + result["compact_small_files"] = compact_small_files + result[ + "position_delete_for_multiple_data_files" + ] = position_delete_for_multiple_data_files + result["max_parallel_data_file_download"] = max_parallel_data_file_download + return result + + @property + def files_for_each_bucket(self) -> tuple: + return self["files_for_each_bucket"] + + @property + def identifier_fields(self) -> List[str]: + return self["identifier_fields"] + + @property + def convert_task_index(self) -> int: + return self["convert_task_index"] + + @property + def iceberg_warehouse_bucket_name(self) -> str: + return self["iceberg_warehouse_bucket_name"] + + @property + def compact_small_files(self) -> bool: + return self["compact_small_files"] + + @property + def position_delete_for_multiple_data_files(self) -> bool: + return self["position_delete_for_multiple_data_files"] + + @property + def max_parallel_data_file_download(self) -> int: + return self["max_parallel_data_file_download"] diff --git a/deltacat/compute/converter/model/converter_session_params.py b/deltacat/compute/converter/model/converter_session_params.py new file mode 100644 index 00000000..ce9731cd --- /dev/null +++ b/deltacat/compute/converter/model/converter_session_params.py @@ -0,0 +1,81 @@ +from __future__ import annotations +from typing import Optional, Dict +from deltacat.compute.converter.constants import DEFAULT_CONVERTER_TASK_MAX_PARALLELISM + + +class ConverterSessionParams(dict): + """ + This class represents the parameters passed to convert_ (deltacat/compute/compactor/compaction_session.py) + """ + + @staticmethod + def of(params: Optional[Dict]) -> ConverterSessionParams: + params = {} if params is None else params + assert params.get("catalog") is not None, "catalog is a required arg" + assert ( + params.get("iceberg_table_name") is not None + ), "iceberg_table_name is a required arg" + assert ( + params.get("iceberg_warehouse_bucket_name") is not None + ), "iceberg_warehouse_bucket_name is a required arg" + result = ConverterSessionParams(params) + + result.compact_small_files = params.get("compact_small_files", False) + + # For Iceberg v3 spec, option to produce delete vector that can establish 1:1 mapping with data files. + result.position_delete_for_multiple_data_files = params.get( + "position_delete_for_multiple_data_files", True + ) + result.task_max_parallelism = params.get( + "task_max_parallelism", DEFAULT_CONVERTER_TASK_MAX_PARALLELISM + ) + result.merge_keys = params.get("merge_keys", None) + return result + + @property + def catalog(self): + return self["catalog"] + + @property + def iceberg_table_name(self) -> str: + return self["iceberg_table_name"] + + @property + def iceberg_warehouse_bucket_name(self) -> str: + return self["iceberg_warehouse_bucket_name"] + + @property + def compact_small_files(self) -> bool: + return self["compact_small_files"] + + @compact_small_files.setter + def compact_small_files(self, compact_small_files) -> None: + self["compact_small_files"] = compact_small_files + + @property + def position_delete_for_multiple_data_files(self) -> bool: + return self["position_delete_for_multiple_data_files"] + + @position_delete_for_multiple_data_files.setter + def position_delete_for_multiple_data_files( + self, position_delete_for_multiple_data_files + ) -> None: + self[ + "position_delete_for_multiple_data_files" + ] = position_delete_for_multiple_data_files + + @property + def task_max_parallelism(self) -> str: + return self["task_max_parallelism"] + + @task_max_parallelism.setter + def task_max_parallelism(self, task_max_parallelism) -> None: + self["task_max_parallelism"] = task_max_parallelism + + @property + def merge_keys(self) -> str: + return self["merge_keys"] + + @merge_keys.setter + def merge_keys(self, merge_keys) -> None: + self["merge_keys"] = merge_keys diff --git a/deltacat/compute/converter/pyiceberg/__init__.py b/deltacat/compute/converter/pyiceberg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/pyiceberg/catalog.py b/deltacat/compute/converter/pyiceberg/catalog.py new file mode 100644 index 00000000..ac12c080 --- /dev/null +++ b/deltacat/compute/converter/pyiceberg/catalog.py @@ -0,0 +1,75 @@ +from typing import Optional + + +def load_catalog(iceberg_catalog_name, iceberg_catalog_properties): + catalog = load_catalog( + name=iceberg_catalog_name, + **iceberg_catalog_properties, + ) + return catalog + + +def get_s3_path( + bucket_name: str, + database_name: Optional[str] = None, + table_name: Optional[str] = None, +) -> str: + result_path = f"s3://{bucket_name}" + if database_name is not None: + result_path += f"/{database_name}.db" + + if table_name is not None: + result_path += f"/{table_name}" + return result_path + + +def get_bucket_name(): + return "metadata-py4j-zyiqin1" + + +def get_s3_prefix(): + return get_s3_path(get_bucket_name()) + + +def get_credential(): + import boto3 + + boto3_session = boto3.Session() + credentials = boto3_session.get_credentials() + return credentials + + +def get_glue_catalog(): + from pyiceberg.catalog import load_catalog + + credential = get_credential() + # Credentials are refreshable, so accessing your access key / secret key + # separately can lead to a race condition. Use this to get an actual matched + # set. + credential = credential.get_frozen_credentials() + access_key_id = credential.access_key + secret_access_key = credential.secret_key + session_token = credential.token + s3_path = get_s3_prefix() + glue_catalog = load_catalog( + "glue", + **{ + "warehouse": s3_path, + "type": "glue", + "aws_access_key_id": access_key_id, + "aws_secret_access_key": secret_access_key, + "aws_session_token": session_token, + "region_name": "us-east-1", + "s3.access-key-id": access_key_id, + "s3.secret-access-key": secret_access_key, + "s3.session-token": session_token, + "s3.region": "us-east-1", + }, + ) + + return glue_catalog + + +def load_table(catalog, table_name): + loaded_table = catalog.load_table(table_name) + return loaded_table diff --git a/deltacat/compute/converter/pyiceberg/overrides.py b/deltacat/compute/converter/pyiceberg/overrides.py new file mode 100644 index 00000000..d9e67034 --- /dev/null +++ b/deltacat/compute/converter/pyiceberg/overrides.py @@ -0,0 +1,134 @@ +from collections import defaultdict +import logging +from deltacat import logs +import pyarrow.parquet as pq + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list): + from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + data_file_statistics_from_parquet_metadata, + compute_statistics_plan, + parquet_path_to_id_mapping, + ) + from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ) + + data_file_content_type = DataFileContent.POSITION_DELETES + iceberg_files = [] + schema = table_metadata.schema() + for files_dict in files_dict_list: + for partition_value, file_paths in files_dict.items(): + for file_path in file_paths: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + _check_pyarrow_schema_compatible( + schema, parquet_metadata.schema.to_arrow_schema() + ) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan( + schema, table_metadata.properties + ), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + data_file = DataFile( + content=data_file_content_type, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=partition_value, + # partition=Record(**{"pk": "111", "bucket": 2}), + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + iceberg_files.append(data_file) + return iceberg_files + + +def fetch_all_bucket_files(table): + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + from pyiceberg.typedef import ( + KeyDefaultDict, + ) + + data_scan = table.scan() + snapshot = data_scan.snapshot() + if not snapshot: + return iter([]) + manifest_evaluators = KeyDefaultDict(data_scan._build_manifest_evaluator) + + manifests = [ + manifest_file + for manifest_file in snapshot.manifests(data_scan.io) + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + ] + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator + from pyiceberg.types import ( + strtobool, + ) + from pyiceberg.table import _min_sequence_number, _open_manifest + from pyiceberg.utils.concurrent import ExecutorFactory + from itertools import chain + from pyiceberg.manifest import DataFileContent + + partition_evaluators = KeyDefaultDict(data_scan._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + data_scan.table_metadata.schema(), + data_scan.row_filter, + data_scan.case_sensitive, + strtobool(data_scan.options.get("include_empty_files", "false")), + ).eval + + min_sequence_number = _min_sequence_number(manifests) + + # {"bucket_index": List[DataFile]} + data_entries = defaultdict(list) + equality_data_entries = defaultdict(list) + positional_delete_entries = defaultdict(list) + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + data_scan.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + metrics_evaluator, + ) + for manifest in manifests + if data_scan._check_sequence_number(min_sequence_number, manifest) + ], + ) + ): + data_file = manifest_entry.data_file + file_sequence_number = manifest_entry.sequence_number + data_file_tuple = (file_sequence_number, data_file) + partition_value = data_file.partition + if data_file.content == DataFileContent.DATA: + data_entries[partition_value].append(data_file_tuple) + if data_file.content == DataFileContent.POSITION_DELETES: + positional_delete_entries[partition_value].append(data_file_tuple) + elif data_file.content == DataFileContent.EQUALITY_DELETES: + equality_data_entries[partition_value].append(data_file_tuple) + else: + logger.warning( + f"Unknown DataFileContent ({data_file.content}): {manifest_entry}" + ) + return data_entries, equality_data_entries, positional_delete_entries diff --git a/deltacat/compute/converter/pyiceberg/replace_snapshot.py b/deltacat/compute/converter/pyiceberg/replace_snapshot.py new file mode 100644 index 00000000..5f8f88a8 --- /dev/null +++ b/deltacat/compute/converter/pyiceberg/replace_snapshot.py @@ -0,0 +1,174 @@ +from typing import Optional, List +import uuid +from pyiceberg.table.snapshots import ( + Operation, +) +from pyiceberg.manifest import ( + DataFileContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, + write_manifest, +) +import itertools +from pyiceberg.utils.concurrent import ExecutorFactory +from pyiceberg.table import UpdateSnapshot, _SnapshotProducer + + +class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]): + """Overwrites data from the table. This will produce an OVERWRITE snapshot. + + Data and delete files were added and removed in a logical overwrite operation. + """ + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + existing_files = [] + snapshot = self._transaction.table_metadata.current_snapshot() + if snapshot: + for manifest_file in snapshot.manifests(io=self._io): + entries = manifest_file.fetch_manifest_entry( + io=self._io, discard_deleted=True + ) + + found_deleted_data_files = [ + entry.data_file + for entry in entries + if entry.data_file in self._deleted_data_files + ] + + if len(found_deleted_data_files) == 0: + existing_files.append(manifest_file) + else: + # We have to replace the manifest file without the deleted data files + if any( + entry.data_file not in found_deleted_data_files + for entry in entries + ): + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[ + manifest_file.partition_spec_id + ], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) as writer: + [ + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + for entry in entries + if entry.data_file not in found_deleted_data_files + ] + existing_files.append(writer.to_manifest_file()) + return existing_files + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted entries. + + With a full overwrite all the entries are considered deleted. + With partial overwrites we have to use the predicate to evaluate + which entries are affected. + """ + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id( + self._parent_snapshot_id + ) + if previous_snapshot is None: + # This should never happen since you cannot overwrite an empty table + raise ValueError( + f"Could not find the previous snapshot: {self._parent_snapshot_id}" + ) + + executor = ExecutorFactory.get_or_create() + + def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: + return [ + ManifestEntry( + status=ManifestEntryStatus.DELETED, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + for entry in manifest.fetch_manifest_entry( + self._io, discard_deleted=True + ) + if entry.data_file.content == DataFileContent.DATA + and entry.data_file in self._deleted_data_files + ] + + list_of_entries = executor.map( + _get_entries, previous_snapshot.manifests(self._io) + ) + return list(itertools.chain(*list_of_entries)) + else: + return [] + + +def replace( + self, + commit_uuid: Optional[uuid.UUID] = None, + using_starting_sequence: Optional[bool] = False, +) -> _ReplaceFiles: + print( + f"tansaction_current_snapshot:{self._transaction.table_metadata.current_snapshot()}" + ) + return _ReplaceFiles( + commit_uuid=commit_uuid, + operation=Operation.REPLACE + if self._transaction.table_metadata.current_snapshot() is not None + else Operation.APPEND, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + using_starting_sequence=using_starting_sequence, + ) + + +UpdateSnapshot.replace = replace + + +def commit_replace_snapshot( + iceberg_table, to_be_deleted_files_list, new_position_delete_files +): + tx = iceberg_table.transaction() + snapshot_properties = {} + commit_uuid = uuid.uuid4() + update_snapshot = tx.update_snapshot(snapshot_properties=snapshot_properties) + replace_snapshot = replace( + self=update_snapshot, commit_uuid=commit_uuid, using_starting_sequence=False + ) + for to_be_deleted_file in to_be_deleted_files_list: + replace_snapshot.append_data_file(to_be_deleted_file) + for to_be_added_file in new_position_delete_files: + replace_snapshot.delete_data_file(to_be_added_file) + replace_snapshot._commit() + tx.commit_transaction() + + +def commit_overwrite_snapshot( + iceberg_table, to_be_deleted_files_list, new_position_delete_files +): + commit_uuid = uuid.uuid4() + with iceberg_table.transaction() as tx: + if iceberg_table.metadata.name_mapping() is None: + iceberg_table.set_properties( + **{ + "schema.name-mapping.default": iceberg_table.table_metadata.schema().name_mapping.model_dump_json() + } + ) + with tx.update_snapshot().overwrite( + commit_uuid=commit_uuid + ) as overwrite_snapshot: + for data_file in new_position_delete_files: + overwrite_snapshot.append_data_file(data_file) + for original_data_file in to_be_deleted_files_list: + overwrite_snapshot.delete_data_file(original_data_file) diff --git a/deltacat/compute/converter/steps/__init__.py b/deltacat/compute/converter/steps/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/steps/convert.py b/deltacat/compute/converter/steps/convert.py new file mode 100644 index 00000000..e48f4c78 --- /dev/null +++ b/deltacat/compute/converter/steps/convert.py @@ -0,0 +1,188 @@ +import pyarrow.compute as pc +import deltacat.compute.converter.utils.iceberg_columns as sc +import pyarrow as pa +import numpy as np +import daft +from collections import defaultdict +import ray +import logging +from deltacat.compute.converter.model.convert_input import ConvertInput +from deltacat.compute.converter.utils.s3u import upload_table_with_retry +from deltacat import logs + +logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) + + +@ray.remote +def convert(convert_input: ConvertInput): + files_for_each_bucket = convert_input.files_for_each_bucket + convert_task_index = convert_input.convert_task_index + iceberg_warehouse_bucket_name = convert_input.iceberg_warehouse_bucket_name + identifier_fields = convert_input.identifier_fields + compact_small_files = convert_input.compact_small_files + position_delete_for_multiple_data_files = ( + convert_input.position_delete_for_multiple_data_files + ) + max_parallel_data_file_download = convert_input.max_parallel_data_file_download + + if not position_delete_for_multiple_data_files: + raise NotImplementedError( + f"Distributed file level position delete compute is not supported yet" + ) + if compact_small_files: + raise NotImplementedError(f"Compact previous position delete not supported yet") + + logger.info(f"Starting convert task index: {convert_task_index}") + data_files, equality_delete_files, position_delete_files = files_for_each_bucket[1] + partition_value = files_for_each_bucket[0] + ( + to_be_deleted_files_list, + to_be_added_files_list, + ) = compute_pos_delete_with_limited_parallelism( + data_files_list=data_files, + identifier_columns=identifier_fields, + equality_delete_files_list=equality_delete_files, + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + max_parallel_data_file_download=max_parallel_data_file_download, + ) + to_be_delete_files_dict = defaultdict() + to_be_delete_files_dict[partition_value] = to_be_deleted_files_list + to_be_added_files_dict = defaultdict() + to_be_added_files_dict[partition_value] = to_be_added_files_list + return (to_be_delete_files_dict, to_be_added_files_dict) + + +def filter_rows_to_be_deleted( + equality_delete_table, data_file_table, identifier_columns +): + identifier_column = identifier_columns[0] + if equality_delete_table and data_file_table: + equality_deletes = pc.is_in( + data_file_table[identifier_column], + equality_delete_table[identifier_column], + ) + positional_delete_table = data_file_table.filter(equality_deletes) + logger.info(f"positional_delete_table:{positional_delete_table.to_pydict()}") + logger.info(f"data_file_table:{data_file_table.to_pydict()}") + logger.info( + f"length_pos_delete_table, {len(positional_delete_table)}, length_data_table:{len(data_file_table)}" + ) + if positional_delete_table: + positional_delete_table = positional_delete_table.drop(["primarykey"]) + if len(positional_delete_table) == len(data_file_table): + return True, None + return False, positional_delete_table + + +def compute_pos_delete( + equality_delete_table, + data_file_table, + identifier_columns, + iceberg_warehouse_bucket_name, +): + delete_whole_file, new_position_delete_table = filter_rows_to_be_deleted( + data_file_table=data_file_table, + equality_delete_table=equality_delete_table, + identifier_columns=identifier_columns, + ) + if new_position_delete_table: + logger.info(f"compute_pos_delete_table:{new_position_delete_table.to_pydict()}") + if new_position_delete_table: + new_pos_delete_s3_link = upload_table_with_retry( + new_position_delete_table, iceberg_warehouse_bucket_name, {} + ) + return delete_whole_file, new_pos_delete_s3_link + + +def download_bucketed_table(data_files, equality_delete_files): + from deltacat.utils.pyarrow import s3_file_to_table + + compacted_table = s3_file_to_table( + [data_file.file_path for data_file in data_files] + ) + equality_delete_table = s3_file_to_table( + [eq_file.file_path for eq_file in equality_delete_files] + ) + return compacted_table, equality_delete_table + + +def download_data_table(data_files, columns): + data_tables = [] + for file in data_files: + table = download_parquet_with_daft_hash_applied( + identify_columns=columns, file=file, s3_client_kwargs={} + ) + table = table.append_column( + sc._FILE_PATH_COLUMN_FIELD, + pa.array(np.repeat(file.file_path, len(table)), sc._FILE_PATH_COLUMN_TYPE), + ) + record_idx_iterator = iter(range(len(table))) + table = sc.append_record_idx_col(table, record_idx_iterator) + data_tables.append(table) + return pa.concat_tables(data_tables) + + +def compute_pos_delete_with_limited_parallelism( + data_files_list, + identifier_columns, + equality_delete_files_list, + iceberg_warehouse_bucket_name, + max_parallel_data_file_download, +): + to_be_deleted_file_list = [] + to_be_added_pos_delete_file_list = [] + + for data_files, equality_delete_files in zip( + data_files_list, equality_delete_files_list + ): + data_table = download_data_table( + data_files=data_files, columns=identifier_columns + ) + equality_delete_table = download_data_table( + data_files=equality_delete_files, columns=identifier_columns + ) + delete_whole_file, new_pos_delete_s3_link = compute_pos_delete( + equality_delete_table=equality_delete_table, + data_file_table=data_table, + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + identifier_columns=identifier_columns, + ) + if delete_whole_file: + to_be_deleted_file_list.extend(data_files) + to_be_deleted_file_list.extend(equality_delete_files) + if new_pos_delete_s3_link: + to_be_added_pos_delete_file_list.extend(new_pos_delete_s3_link) + + to_be_deleted_file_list.extend(equality_delete_files) + logger.info(f"convert_to_be_deleted_file_list:{to_be_deleted_file_list}") + logger.info( + f"convert_to_be_added_pos_delete_file_list:{to_be_added_pos_delete_file_list}" + ) + return to_be_deleted_file_list, to_be_added_pos_delete_file_list + + +def download_parquet_with_daft_hash_applied( + identify_columns, file, s3_client_kwargs, **kwargs +): + from daft import TimeUnit + + # TODO: Add correct read kwargs as in: + # https://github.com/ray-project/deltacat/blob/383855a4044e4dfe03cf36d7738359d512a517b4/deltacat/utils/daft.py#L97 + + coerce_int96_timestamp_unit = TimeUnit.from_str( + kwargs.get("coerce_int96_timestamp_unit", "ms") + ) + + from deltacat.utils.daft import _get_s3_io_config + + # TODO: Use Daft SHA1 hash instead to minimize probably of data corruption + io_config = _get_s3_io_config(s3_client_kwargs=s3_client_kwargs) + df = daft.read_parquet( + path=file.file_path, + io_config=io_config, + coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, + ) + logger.info(f"debug_identify_columns:{identify_columns}") + df = df.select(daft.col(identify_columns[0]).hash()) + arrow_table = df.to_arrow() + return arrow_table diff --git a/deltacat/compute/converter/utils/__init__.py b/deltacat/compute/converter/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/deltacat/compute/converter/utils/convert_task_options.py b/deltacat/compute/converter/utils/convert_task_options.py new file mode 100644 index 00000000..2d0f7532 --- /dev/null +++ b/deltacat/compute/converter/utils/convert_task_options.py @@ -0,0 +1,90 @@ +from typing import Optional, Dict +from deltacat.exceptions import RetryableError + +AVERAGE_FILE_PATH_COLUMN_SIZE_BYTES = 80 +AVERAGE_POS_COLUMN_SIZE_BYTES = 4 +XXHASH_BYTE_PER_RECORD = 8 +MEMORY_BUFFER_RATE = 1.2 + + +def estimate_fixed_hash_columns(hash_value_size_bytes_per_record, total_record_count): + return hash_value_size_bytes_per_record * total_record_count + + +def get_total_record_from_iceberg_files(iceberg_files_list): + total_record_count = 0 + for iceberg_files in iceberg_files_list: + total_record_count += sum(file.record_count for file in iceberg_files) + return total_record_count + + +def estimate_iceberg_pos_delete_additional_columns( + include_columns, num_of_record_count +): + total_additional_columns_sizes = 0 + if "file_path" in include_columns: + total_additional_columns_sizes += ( + AVERAGE_FILE_PATH_COLUMN_SIZE_BYTES * num_of_record_count + ) + elif "pos" in include_columns: + total_additional_columns_sizes += ( + AVERAGE_POS_COLUMN_SIZE_BYTES * num_of_record_count + ) + return total_additional_columns_sizes + + +def estimate_convert_remote_option_resources(data_files, equality_delete_files): + data_file_record_count = get_total_record_from_iceberg_files(data_files) + equality_delete_record_count = get_total_record_from_iceberg_files( + equality_delete_files + ) + hash_column_sizes = estimate_fixed_hash_columns( + XXHASH_BYTE_PER_RECORD, data_file_record_count + equality_delete_record_count + ) + pos_delete_sizes = estimate_iceberg_pos_delete_additional_columns( + ["file_path", "pos"], data_file_record_count + equality_delete_record_count + ) + total_memory_required = hash_column_sizes + pos_delete_sizes + return total_memory_required * MEMORY_BUFFER_RATE + + +def _get_task_options( + memory: float, + ray_custom_resources: Optional[Dict] = None, + scheduling_strategy: str = "SPREAD", +) -> Dict: + + # NOTE: With DEFAULT scheduling strategy in Ray 2.20.0, autoscaler does + # not spin up enough nodes fast and hence we see only approximately + # 20 tasks get scheduled out of 100 tasks in queue. Hence, we use SPREAD + # which is also ideal for merge and hash bucket tasks. + # https://docs.ray.io/en/latest/ray-core/scheduling/index.html + task_opts = { + "memory": memory, + "scheduling_strategy": scheduling_strategy, + } + + if ray_custom_resources: + task_opts["resources"] = ray_custom_resources + + task_opts["max_retries"] = 3 + + # List of possible botocore exceptions are available at + # https://github.com/boto/botocore/blob/develop/botocore/exceptions.py + task_opts["retry_exceptions"] = [RetryableError] + + print(f"estimated_memory:{memory}") + return task_opts + + +def convert_resource_options_provider(index, files_for_each_bucket): + print(f"option_files_for_each_bucket:{files_for_each_bucket}") + ( + data_files_list, + equality_delete_files_list, + position_delete_files_list, + ) = files_for_each_bucket[1] + memory_requirement = estimate_convert_remote_option_resources( + data_files_list, equality_delete_files_list + ) + return _get_task_options(memory=memory_requirement) diff --git a/deltacat/compute/converter/utils/converter_session_utils.py b/deltacat/compute/converter/utils/converter_session_utils.py new file mode 100644 index 00000000..3c5a9063 --- /dev/null +++ b/deltacat/compute/converter/utils/converter_session_utils.py @@ -0,0 +1,50 @@ +def check_data_files_sequence_number(data_files_list, equality_delete_files_list): + data_files_list.sort(key=lambda file_tuple: file_tuple[0]) + equality_delete_files_list.sort(key=lambda file_tuple: file_tuple[0]) + + equality_delete_files = [] + result_data_file = [] + + # Pointer for list data_file + data_file_pointer = 0 + + debug_equality_delete_files = [] + + # Loop through each value in equality_delete_file + for equality_file_tuple in equality_delete_files_list: + # Find all values in data_file that are smaller than val_equality + valid_values = [] + + debug_valid_values = [] + # Move data_file_pointer to the first value in data_file that is smaller than val_equality + while ( + data_file_pointer < len(data_files_list) + and data_files_list[data_file_pointer][0] < equality_file_tuple[0] + ): + valid_values.append(data_files_list[data_file_pointer][1]) + debug_valid_values.append(data_files_list[data_file_pointer]) + data_file_pointer += 1 + equality_delete_files.append(equality_file_tuple[1]) + debug_equality_delete_files.append(equality_file_tuple) + + # Append the value from equality_delete_file and the corresponding valid values from data_file + if valid_values: + result_data_file.append(valid_values) + + result_equality_delete_file = append_larger_sequence_number_data_files( + equality_delete_files + ) + + return result_equality_delete_file, result_data_file + + +def append_larger_sequence_number_data_files(data_files_list): + result = [] + # Iterate over the input list + for i in range(len(data_files_list)): + sublist = data_files_list[i:] + sublist_file_list = [] + for file in sublist: + sublist_file_list.append(file) + result.append(sublist_file_list) + return result diff --git a/deltacat/compute/converter/utils/iceberg_columns.py b/deltacat/compute/converter/utils/iceberg_columns.py new file mode 100644 index 00000000..2bd795bb --- /dev/null +++ b/deltacat/compute/converter/utils/iceberg_columns.py @@ -0,0 +1,38 @@ +import pyarrow as pa +from typing import Union + + +def _get_iceberg_col_name(suffix): + return f"{suffix}" + + +_ORDERED_RECORD_IDX_COLUMN_NAME = _get_iceberg_col_name("pos") +_ORDERED_RECORD_IDX_COLUMN_TYPE = pa.int64() +_ORDERED_RECORD_IDX_COLUMN_FIELD = pa.field( + _ORDERED_RECORD_IDX_COLUMN_NAME, + _ORDERED_RECORD_IDX_COLUMN_TYPE, +) + + +def get_record_index_column_array(obj) -> Union[pa.Array, pa.ChunkedArray]: + return pa.array( + obj, + _ORDERED_RECORD_IDX_COLUMN_TYPE, + ) + + +def append_record_idx_col(table: pa.Table, ordered_record_indices) -> pa.Table: + + table = table.append_column( + _ORDERED_RECORD_IDX_COLUMN_FIELD, + get_record_index_column_array(ordered_record_indices), + ) + return table + + +_FILE_PATH_COLUMN_NAME = _get_iceberg_col_name("file_path") +_FILE_PATH_COLUMN_TYPE = pa.string() +_FILE_PATH_COLUMN_FIELD = pa.field( + _FILE_PATH_COLUMN_NAME, + _FILE_PATH_COLUMN_TYPE, +) diff --git a/deltacat/compute/converter/utils/s3u.py b/deltacat/compute/converter/utils/s3u.py new file mode 100644 index 00000000..5fe9ef4a --- /dev/null +++ b/deltacat/compute/converter/utils/s3u.py @@ -0,0 +1,129 @@ +from tenacity import ( + Retrying, + retry_if_exception_type, + stop_after_delay, + wait_random_exponential, +) +from typing import Union +from deltacat.aws.s3u import CapturedBlockWritePaths, UuidBlockWritePathProvider +from deltacat.types.tables import ( + get_table_writer, + get_table_length, + TABLE_CLASS_TO_SLICER_FUNC, +) +from typing import Optional, Dict, Any, List +from deltacat.exceptions import RetryableError +from deltacat.storage import ( + DistributedDataset, + LocalTable, +) +from deltacat.types.media import ( + ContentEncoding, + ContentType, +) +from deltacat.aws.s3u import UPLOAD_SLICED_TABLE_RETRY_STOP_AFTER_DELAY +import s3fs + + +def get_credential(): + import boto3 + + boto3_session = boto3.Session() + credentials = boto3_session.get_credentials() + return credentials + + +def get_s3_file_system(content_type): + token_holder = get_credential() + content_encoding = ContentEncoding.IDENTITY + + s3_file_system = s3fs.S3FileSystem( + key=token_holder.access_key, + secret=token_holder.secret_key, + token=token_holder.token, + s3_additional_kwargs={ + "ServerSideEncryption": "aws:kms", + # TODO: Get tagging from table properties + "ContentType": content_type.value, + "ContentEncoding": content_encoding.value, + }, + ) + return s3_file_system + + +def upload_table_with_retry( + table: Union[LocalTable, DistributedDataset], + s3_url_prefix: str, + s3_table_writer_kwargs: Optional[Dict[str, Any]], + content_type: ContentType = ContentType.PARQUET, + max_records_per_file: Optional[int] = 4000000, + **s3_client_kwargs, +) -> List[str]: + """ + Writes the given table to 1 or more S3 files and return Redshift + manifest entries describing the uploaded files. + """ + retrying = Retrying( + wait=wait_random_exponential(multiplier=1, max=60), + stop=stop_after_delay(UPLOAD_SLICED_TABLE_RETRY_STOP_AFTER_DELAY), + retry=retry_if_exception_type(RetryableError), + ) + + if s3_table_writer_kwargs is None: + s3_table_writer_kwargs = {} + + s3_file_system = get_s3_file_system(content_type=content_type) + capture_object = CapturedBlockWritePaths() + block_write_path_provider = UuidBlockWritePathProvider(capture_object) + s3_table_writer_func = get_table_writer(table) + table_record_count = get_table_length(table) + if max_records_per_file is None or not table_record_count: + retrying( + fn=upload_table, + table_slices=table, + s3_base_url=f"{s3_url_prefix}", + s3_file_system=s3_file_system, + s3_table_writer_func=s3_table_writer_func, + s3_table_writer_kwargs=s3_table_writer_kwargs, + block_write_path_provider=block_write_path_provider, + content_type=content_type, + **s3_client_kwargs, + ) + else: + table_slicer_func = TABLE_CLASS_TO_SLICER_FUNC.get(type(table)) + table_slices = table_slicer_func(table, max_records_per_file) + for table_slice in table_slices: + retrying( + fn=upload_table, + table_slices=table_slice, + s3_base_url=f"{s3_url_prefix}", + s3_file_system=s3_file_system, + s3_table_writer_func=s3_table_writer_func, + s3_table_writer_kwargs=s3_table_writer_kwargs, + block_write_path_provider=block_write_path_provider, + content_type=content_type, + **s3_client_kwargs, + ) + del block_write_path_provider + write_paths = capture_object.write_paths() + return write_paths + + +def upload_table( + table_slices, + s3_base_url, + s3_file_system, + s3_table_writer_func, + block_write_path_provider, + content_type, + s3_table_writer_kwargs, +): + s3_table_writer_func( + table_slices, + s3_base_url, + s3_file_system, + block_write_path_provider, + content_type.value, + **s3_table_writer_kwargs, + ) + # TODO: Add a proper fix for block_refs and write_paths not persisting in Ray actors diff --git a/deltacat/tests/storage/rivulet/writer/test_memtable_dataset_writer.py b/deltacat/tests/storage/rivulet/writer/test_memtable_dataset_writer.py index 79efde79..03108af1 100644 --- a/deltacat/tests/storage/rivulet/writer/test_memtable_dataset_writer.py +++ b/deltacat/tests/storage/rivulet/writer/test_memtable_dataset_writer.py @@ -5,7 +5,9 @@ from deltacat.storage.rivulet.metastore.manifest import JsonManifestIO from deltacat.storage.rivulet import Schema from deltacat.storage.rivulet.schema.datatype import Datatype -from deltacat.storage.rivulet.writer.memtable_dataset_writer import MemtableDatasetWriter +from deltacat.storage.rivulet.writer.memtable_dataset_writer import ( + MemtableDatasetWriter, +) @pytest.fixture @@ -32,10 +34,10 @@ def file_store(): @pytest.fixture def writer(location_provider, test_schema): return MemtableDatasetWriter( - location_provider=location_provider, - schema=test_schema + location_provider=location_provider, schema=test_schema ) + def test_write_after_flush(writer, file_store): writer.write_dict({"id": 100, "name": "alpha"}) manifest_uri_1 = writer.flush() @@ -60,8 +62,12 @@ def test_write_after_flush(writer, file_store): assert len(sst_files_2) > 0, "Second flush: no SST files found." # ensures data_files and sst_files from first write are not included in second write. - assert data_files_1.isdisjoint(data_files_2), \ - "Expected no overlap of data files between first and second flush." - assert sst_files_1.isdisjoint(sst_files_2), \ - "Expected no overlap of SST files between first and second flush." - assert manifest_2.context.schema == writer.schema, "Schema mismatch in second flush." + assert data_files_1.isdisjoint( + data_files_2 + ), "Expected no overlap of data files between first and second flush." + assert sst_files_1.isdisjoint( + sst_files_2 + ), "Expected no overlap of SST files between first and second flush." + assert ( + manifest_2.context.schema == writer.schema + ), "Schema mismatch in second flush."