Skip to content

Commit

Permalink
Update with producing file level pos deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
Zyiqin-Miranda committed Nov 6, 2024
1 parent 74502d3 commit 3d5149d
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def convert_equality_deletes_to_position_deletes(
snapshot_id,
iceberg_catalog_name,
iceberg_catalog_properties,
iceberg_spec_format_version,
memory_resource_planner,
**kwargs):
"""
Convert equality delete to position delete.
Expand All @@ -25,45 +27,124 @@ def convert_equality_deletes_to_position_deletes(
iceberg_table = fetch_table(catalog, source_table_identifier)
num_of_buckets = get_num_of_bucket(iceberg_table)

from deltacat.equality_delete_to_position_delete_converter.overrides import fetch_all_bucket_files
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(table, num_of_buckets)

convert_options_provider = functools.partial(
task_resource_options_provider,
pg_config=params.pg_config,
resource_amount_provider=memory_resource_planner,
equality_delete_file_meta=equality_delete_dict[bucket_index],
primary_keys=params.primary_keys,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
estimate_resources_params=params.estimate_resources_params,
)

def convert_input_provider(index, item) -> dict[str, MergeInput]:
return {
"input": ConvertInput.of(
bucket_index=bucket_index,
equality_delete_file=equality_delete_dict[index],
position_delete_file=position_delete_dict[index],
data_file_dict=data_file_dict[index],
primary_keys=params.primary_keys,
sort_keys=params.sort_keys,
compact_small_files=compact_small_files,
# copied from compactor v2
enable_profiler=params.enable_profiler,
metrics_config=params.metrics_config,
s3_table_writer_kwargs=params.s3_table_writer_kwargs,
read_kwargs_provider=params.read_kwargs_provider,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
)
}
# Ray remote task: convert
# Assuming that memory comsume by each bucket doesn't exceed one node's memory limit.
# Assuming that memory consume by each bucket doesn't exceed one node's memory limit.
# TODO: Add split mechanism to split large buckets
convert_tasks_pending = invoke_parallel(
items=all_files_for_bucket.items(),
ray_task=convert,
max_parallelism=task_max_parallelism,
options_provider=convert_options_provider,
kwargs_provider=convert_input_provider,
)

materialized_s3_links = ray.get(convert_tasks_pending)

# Commit ideally a REPLACE type snapshot, not supported yet in Pyiceberg, leave as placeholder for now.
commit_s3_files_to_iceberg_snapshot(materialized_s3_links)
from deltacat.equality_delete_to_position_delete_converter.overrides import replace

with table.transaction() as tx:
snapshot_properties = {}
commit_uuid = uuid.uuid4()
with tx.update_snapshot(snapshot_properties=snapshot_properties).replace(
commit_uuid=commit_uuid, using_starting_sequence=True
) as replace_snapshot:
replace_snapshot.append_data_file(materialized_s3_links)
replace_snapshot.delete_data_file(equality_delete_files)
replace_snapshot._commit()

@ray.remote
def convert(table, snapshot_id, bucket_index):
# Load Icberg table, get S3 links
data_files, equality_delete_files = load_table_files(table, snapshot_id, bucket_index)

# Read from S3
compacted_table, equality_delete_table = download_bucketed_table(data_files=data_files,
equality_delete_files=equality_delete_files,
@ray.remote
def convert(bucket_index,
equality_delete_file,
position_delete_file,
data_file_dict,
primary_keys,
sort_keys,
compact_small_files):

# Download ONLY equality delete table primary keys and store in Ray Plasma store
equality_delete_table = download_bucketed_table(equality_delete_files=equality_delete_files,
columns=primary_key_columns)

# Compute
compacted_table = append_record_idx(compacted_table)
position_delete_table = filter_rows_to_be_deleted(compacted_table, equality_delete_table)
position_delete_table = append_data_file_path(position_delete_table)
equality_delete_table_ref = ray.put(equality_delete_table)

# Write to S3, get S3 link back
positional_delete_files = materialize_positional_delete_table_to_s3(position_delete_table)
def pos_compute_options_provider(memory_resource_planner):
return (memory_resource_planner(data_file_primary_key_memory + previous_pos_delete_file_primary_key_memory +
file_path_column_memory + pos_column_memory))


file_level_compute_tasks_pending_ref = invoke_parallel(
items=data_file_dict.items(),
ray_task=compute_pos_delete_file_level,

max_parallelism=task_max_parallelism,
options_provider=pos_compute_options_provider,
kwargs_provider=pos_compute_input_provider,
)

materialzed_pos_delete_file_link = ray.get(file_level_compute_tasks_pending_ref)

# For query performance purpose, setting lower_bound == upper bound will help filter out positional delete files,
# Code link: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L52
set_lower_upper_bound_positional_delete_metadata(positional_delete_files)

return positional_delete_files
return materialzed_pos_delete_file_link

@ray.remote
def compute_pos_delete_file_level(equality_delete_table_ref, data_file, previous_pos_delete_files):
# fetch equality delete table from ray plasma store
equality_delete_table = ray.get(equality_delete_table_ref)

data_file_table = download_bucketed_table(files=data_file,
columns=primary_key_columns)

if previous_pos_delete_files:
previous_pos_delete_table = download_bucketed_table(files=previous_pos_delete_file,
columns=primary_key_column)

data_file_table = append_record_idx(data_file_table)
new_position_delete_table = filter_rows_to_be_deleted(data_file_table, equality_delete_table)
pos_delete_table = pa.concat_tables([previous_pos_delete_table, new_position_delete_table])
pos_delete_table= append_data_file_path(pos_delete_table)

# Write to S3, get S3 link back
new_positional_delete_file = materialize_positional_delete_table_to_s3(pos_delete_table)
return new_positional_delete_file

def fetch_table(iceberg_catalog, table_identifer):
iceberg_table = iceberg_catalog.load_table(table_identifer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
def fetch_all_bucket_files(table, number_of_buckets):
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
from pyiceberg.typedef import (
EMPTY_DICT,
IcebergBaseModel,
IcebergRootModel,
Identifier,
KeyDefaultDict,
)

data_scan = table.scan()
snapshot = data_scan.snapshot()
if not snapshot:
return iter([])
manifest_evaluators = KeyDefaultDict(data_scan._build_manifest_evaluator)

manifests = [
manifest_file
for manifest_file in snapshot.manifests(data_scan.io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
from pyiceberg.types import (
strtobool,
)
from pyiceberg.table import _min_sequence_number, _open_manifest
from pyiceberg.utils.concurrent import ExecutorFactory
from itertools import chain
from pyiceberg.manifest import DataFileContent

partition_evaluators = KeyDefaultDict(data_scan._build_partition_evaluator)
metrics_evaluator = _InclusiveMetricsEvaluator(
data_scan.table_metadata.schema(),
data_scan.row_filter,
data_scan.case_sensitive,
strtobool(data_scan.options.get("include_empty_files", "false")),
).eval

min_sequence_number = _min_sequence_number(manifests)

# {"bucket_index": List[DataFile]}
data_entries = defaultdict()
equality_data_entries = defaultdict()
positional_delete_entries = defaultdict()

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
data_scan.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
metrics_evaluator,
)
for manifest in manifests
if data_scan._check_sequence_number(min_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
if data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
equality_data_entries.append(data_file)
else:
logger.warning(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
return data_entries, equality_data_entries, positional_delete_entries

def replace(self, commit_uuid: Optional[uuid.UUID] = None, using_starting_sequence: Optional[bool] = False) -> _ReplaceFiles:
print(f"tansaction_current_snapshot:{self._transaction.table_metadata.current_snapshot()}")
return _ReplaceFiles(
commit_uuid=commit_uuid,
operation=Operation.REPLACE
if self._transaction.table_metadata.current_snapshot() is not None
else Operation.APPEND,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
using_starting_sequence=using_starting_sequence,
)

# Overrides SnapshotProducer to allow using deleted files' sequence number
def _manifests(self) -> List[ManifestFile]:
def _write_added_manifest() -> List[ManifestFile]:
replace_manifest_using_starting_sequence = None
if self.using_starting_sequence and self._operation == Operation.REPLACE and self._deleted_entries:
snapshot = self._transaction.table_metadata.current_snapshot()
for manifest_file in snapshot.manifests(io=self._io):
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=False)
relevant_manifests = [entry for entry in entries if entry.data_file in self._deleted_data_files]
replace_manifest_using_starting_sequence = min(
entry.sequence_number for entry in relevant_manifests)
if self._added_data_files:
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for data_file in self._added_data_files:
writer.add(
ManifestEntry(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
sequence_number=replace_manifest_using_starting_sequence if replace_manifest_using_starting_sequence else None,
file_sequence_number=None,
data_file=data_file,
)
)
return [writer.to_manifest_file()]
else:
return []


class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
Data and delete files were added and removed in a logical overwrite operation.
"""

def _existing_manifests(self) -> List[ManifestFile]:
"""Determine if there are any existing manifest files."""
existing_files = []

if snapshot := self._transaction.table_metadata.current_snapshot():
for manifest_file in snapshot.manifests(io=self._io):
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)

found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]

if len(found_deleted_data_files) == 0:
existing_files.append(manifest_file)
else:
# We have to replace the manifest file without the deleted data files
if any(entry.data_file not in found_deleted_data_files for entry in entries):
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
[
writer.add_entry(
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
)
for entry in entries
if entry.data_file not in found_deleted_data_files
]
existing_files.append(writer.to_manifest_file())
return existing_files

def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted entries.
With a full overwrite all the entries are considered deleted.
With partial overwrites we have to use the predicate to evaluate
which entries are affected.
"""
if self._parent_snapshot_id is not None:
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if previous_snapshot is None:
# This should never happen since you cannot overwrite an empty table
raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}")

executor = ExecutorFactory.get_or_create()

def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
return [
ManifestEntry(
status=ManifestEntryStatus.DELETED,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True)
if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files
]

list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io))
return list(itertools.chain(*list_of_entries))
else:
return []


0 comments on commit 3d5149d

Please sign in to comment.