diff --git a/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py b/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py index e6e4d95f..69d17aee 100644 --- a/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py +++ b/deltacat/compute/equality_delete_to_position_delete_conveter/converter.py @@ -12,6 +12,8 @@ def convert_equality_deletes_to_position_deletes( snapshot_id, iceberg_catalog_name, iceberg_catalog_properties, + iceberg_spec_format_version, + memory_resource_planner, **kwargs): """ Convert equality delete to position delete. @@ -25,45 +27,124 @@ def convert_equality_deletes_to_position_deletes( iceberg_table = fetch_table(catalog, source_table_identifier) num_of_buckets = get_num_of_bucket(iceberg_table) + from deltacat.equality_delete_to_position_delete_converter.overrides import fetch_all_bucket_files + data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(table, num_of_buckets) + + convert_options_provider = functools.partial( + task_resource_options_provider, + pg_config=params.pg_config, + resource_amount_provider=memory_resource_planner, + equality_delete_file_meta=equality_delete_dict[bucket_index], + primary_keys=params.primary_keys, + deltacat_storage=params.deltacat_storage, + deltacat_storage_kwargs=params.deltacat_storage_kwargs, + ray_custom_resources=params.ray_custom_resources, + memory_logs_enabled=params.memory_logs_enabled, + estimate_resources_params=params.estimate_resources_params, + ) + + def convert_input_provider(index, item) -> dict[str, MergeInput]: + return { + "input": ConvertInput.of( + bucket_index=bucket_index, + equality_delete_file=equality_delete_dict[index], + position_delete_file=position_delete_dict[index], + data_file_dict=data_file_dict[index], + primary_keys=params.primary_keys, + sort_keys=params.sort_keys, + compact_small_files=compact_small_files, + # copied from compactor v2 + enable_profiler=params.enable_profiler, + metrics_config=params.metrics_config, + s3_table_writer_kwargs=params.s3_table_writer_kwargs, + read_kwargs_provider=params.read_kwargs_provider, + deltacat_storage=params.deltacat_storage, + deltacat_storage_kwargs=params.deltacat_storage_kwargs, + ) + } # Ray remote task: convert - # Assuming that memory comsume by each bucket doesn't exceed one node's memory limit. + # Assuming that memory consume by each bucket doesn't exceed one node's memory limit. # TODO: Add split mechanism to split large buckets convert_tasks_pending = invoke_parallel( + items=all_files_for_bucket.items(), ray_task=convert, max_parallelism=task_max_parallelism, options_provider=convert_options_provider, kwargs_provider=convert_input_provider, ) + materialized_s3_links = ray.get(convert_tasks_pending) - # Commit ideally a REPLACE type snapshot, not supported yet in Pyiceberg, leave as placeholder for now. - commit_s3_files_to_iceberg_snapshot(materialized_s3_links) + from deltacat.equality_delete_to_position_delete_converter.overrides import replace + with table.transaction() as tx: + snapshot_properties = {} + commit_uuid = uuid.uuid4() + with tx.update_snapshot(snapshot_properties=snapshot_properties).replace( + commit_uuid=commit_uuid, using_starting_sequence=True + ) as replace_snapshot: + replace_snapshot.append_data_file(materialized_s3_links) + replace_snapshot.delete_data_file(equality_delete_files) + replace_snapshot._commit() -@ray.remote -def convert(table, snapshot_id, bucket_index): - # Load Icberg table, get S3 links - data_files, equality_delete_files = load_table_files(table, snapshot_id, bucket_index) - # Read from S3 - compacted_table, equality_delete_table = download_bucketed_table(data_files=data_files, - equality_delete_files=equality_delete_files, +@ray.remote +def convert(bucket_index, + equality_delete_file, + position_delete_file, + data_file_dict, + primary_keys, + sort_keys, + compact_small_files): + + # Download ONLY equality delete table primary keys and store in Ray Plasma store + equality_delete_table = download_bucketed_table(equality_delete_files=equality_delete_files, columns=primary_key_columns) - # Compute - compacted_table = append_record_idx(compacted_table) - position_delete_table = filter_rows_to_be_deleted(compacted_table, equality_delete_table) - position_delete_table = append_data_file_path(position_delete_table) + equality_delete_table_ref = ray.put(equality_delete_table) - # Write to S3, get S3 link back - positional_delete_files = materialize_positional_delete_table_to_s3(position_delete_table) + def pos_compute_options_provider(memory_resource_planner): + return (memory_resource_planner(data_file_primary_key_memory + previous_pos_delete_file_primary_key_memory + + file_path_column_memory + pos_column_memory)) + + + file_level_compute_tasks_pending_ref = invoke_parallel( + items=data_file_dict.items(), + ray_task=compute_pos_delete_file_level, + + max_parallelism=task_max_parallelism, + options_provider=pos_compute_options_provider, + kwargs_provider=pos_compute_input_provider, + ) + + materialzed_pos_delete_file_link = ray.get(file_level_compute_tasks_pending_ref) # For query performance purpose, setting lower_bound == upper bound will help filter out positional delete files, # Code link: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L52 set_lower_upper_bound_positional_delete_metadata(positional_delete_files) - return positional_delete_files + return materialzed_pos_delete_file_link + +@ray.remote +def compute_pos_delete_file_level(equality_delete_table_ref, data_file, previous_pos_delete_files): + # fetch equality delete table from ray plasma store + equality_delete_table = ray.get(equality_delete_table_ref) + data_file_table = download_bucketed_table(files=data_file, + columns=primary_key_columns) + + if previous_pos_delete_files: + previous_pos_delete_table = download_bucketed_table(files=previous_pos_delete_file, + columns=primary_key_column) + + data_file_table = append_record_idx(data_file_table) + new_position_delete_table = filter_rows_to_be_deleted(data_file_table, equality_delete_table) + pos_delete_table = pa.concat_tables([previous_pos_delete_table, new_position_delete_table]) + pos_delete_table= append_data_file_path(pos_delete_table) + + # Write to S3, get S3 link back + new_positional_delete_file = materialize_positional_delete_table_to_s3(pos_delete_table) + return new_positional_delete_file def fetch_table(iceberg_catalog, table_identifer): iceberg_table = iceberg_catalog.load_table(table_identifer) diff --git a/deltacat/compute/equality_delete_to_position_delete_conveter/overrides.py b/deltacat/compute/equality_delete_to_position_delete_conveter/overrides.py new file mode 100644 index 00000000..abfe297a --- /dev/null +++ b/deltacat/compute/equality_delete_to_position_delete_conveter/overrides.py @@ -0,0 +1,201 @@ +def fetch_all_bucket_files(table, number_of_buckets): + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + from pyiceberg.typedef import ( + EMPTY_DICT, + IcebergBaseModel, + IcebergRootModel, + Identifier, + KeyDefaultDict, + ) + + data_scan = table.scan() + snapshot = data_scan.snapshot() + if not snapshot: + return iter([]) + manifest_evaluators = KeyDefaultDict(data_scan._build_manifest_evaluator) + + manifests = [ + manifest_file + for manifest_file in snapshot.manifests(data_scan.io) + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + ] + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator + from pyiceberg.types import ( + strtobool, + ) + from pyiceberg.table import _min_sequence_number, _open_manifest + from pyiceberg.utils.concurrent import ExecutorFactory + from itertools import chain + from pyiceberg.manifest import DataFileContent + + partition_evaluators = KeyDefaultDict(data_scan._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + data_scan.table_metadata.schema(), + data_scan.row_filter, + data_scan.case_sensitive, + strtobool(data_scan.options.get("include_empty_files", "false")), + ).eval + + min_sequence_number = _min_sequence_number(manifests) + + # {"bucket_index": List[DataFile]} + data_entries = defaultdict() + equality_data_entries = defaultdict() + positional_delete_entries = defaultdict() + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + data_scan.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + metrics_evaluator, + ) + for manifest in manifests + if data_scan._check_sequence_number(min_sequence_number, manifest) + ], + ) + ): + data_file = manifest_entry.data_file + if data_file.content == DataFileContent.DATA: + data_entries.append(manifest_entry) + if data_file.content == DataFileContent.POSITION_DELETES: + positional_delete_entries.add(manifest_entry) + elif data_file.content == DataFileContent.EQUALITY_DELETES: + equality_data_entries.append(data_file) + else: + logger.warning(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") + return data_entries, equality_data_entries, positional_delete_entries + +def replace(self, commit_uuid: Optional[uuid.UUID] = None, using_starting_sequence: Optional[bool] = False) -> _ReplaceFiles: + print(f"tansaction_current_snapshot:{self._transaction.table_metadata.current_snapshot()}") + return _ReplaceFiles( + commit_uuid=commit_uuid, + operation=Operation.REPLACE + if self._transaction.table_metadata.current_snapshot() is not None + else Operation.APPEND, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + using_starting_sequence=using_starting_sequence, + ) + +# Overrides SnapshotProducer to allow using deleted files' sequence number +def _manifests(self) -> List[ManifestFile]: + def _write_added_manifest() -> List[ManifestFile]: + replace_manifest_using_starting_sequence = None + if self.using_starting_sequence and self._operation == Operation.REPLACE and self._deleted_entries: + snapshot = self._transaction.table_metadata.current_snapshot() + for manifest_file in snapshot.manifests(io=self._io): + entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=False) + relevant_manifests = [entry for entry in entries if entry.data_file in self._deleted_data_files] + replace_manifest_using_starting_sequence = min( + entry.sequence_number for entry in relevant_manifests) + if self._added_data_files: + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) as writer: + for data_file in self._added_data_files: + writer.add( + ManifestEntry( + status=ManifestEntryStatus.ADDED, + snapshot_id=self._snapshot_id, + sequence_number=replace_manifest_using_starting_sequence if replace_manifest_using_starting_sequence else None, + file_sequence_number=None, + data_file=data_file, + ) + ) + return [writer.to_manifest_file()] + else: + return [] + + +class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]): + """Overwrites data from the table. This will produce an OVERWRITE snapshot. + + Data and delete files were added and removed in a logical overwrite operation. + """ + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + existing_files = [] + + if snapshot := self._transaction.table_metadata.current_snapshot(): + for manifest_file in snapshot.manifests(io=self._io): + entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True) + + found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files] + + if len(found_deleted_data_files) == 0: + existing_files.append(manifest_file) + else: + # We have to replace the manifest file without the deleted data files + if any(entry.data_file not in found_deleted_data_files for entry in entries): + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) as writer: + [ + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + for entry in entries + if entry.data_file not in found_deleted_data_files + ] + existing_files.append(writer.to_manifest_file()) + return existing_files + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted entries. + + With a full overwrite all the entries are considered deleted. + With partial overwrites we have to use the predicate to evaluate + which entries are affected. + """ + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if previous_snapshot is None: + # This should never happen since you cannot overwrite an empty table + raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + + executor = ExecutorFactory.get_or_create() + + def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: + return [ + ManifestEntry( + status=ManifestEntryStatus.DELETED, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files + ] + + list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) + return list(itertools.chain(*list_of_entries)) + else: + return [] + +