Skip to content

Commit

Permalink
[Tests passing] [2.0] Add initial eq-to-pos delete job (#356)
Browse files Browse the repository at this point in the history
* [WIP] Add eq-to-pos delete job session draft

* Update with producing file level pos deletes

* Resolve dependency conflicts with 2.0 branch

* Add more documentation to example; code cleanup

* Bump linter version to fix linter + reformatting
  • Loading branch information
Zyiqin-Miranda authored Jan 27, 2025
1 parent ae2c61c commit 9d15ee5
Show file tree
Hide file tree
Showing 19 changed files with 1,729 additions and 8 deletions.
Empty file.
554 changes: 554 additions & 0 deletions deltacat/compute/converter/_dev/example_single_merge_key_converter.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions deltacat/compute/converter/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DEFAULT_CONVERTER_TASK_MAX_PARALLELISM = 4096

# Safe limit ONLY considering CPU limit, typically 32 for a 8x-large worker
DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD = 30
143 changes: 143 additions & 0 deletions deltacat/compute/converter/converter_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from deltacat.utils.ray_utils.concurrency import (
invoke_parallel,
task_resource_options_provider,
)
import ray
import functools
from deltacat.compute.converter.utils.convert_task_options import (
convert_resource_options_provider,
)
import logging
from deltacat import logs
from collections import defaultdict
from deltacat.compute.converter.model.converter_session_params import (
ConverterSessionParams,
)
from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD
from deltacat.compute.converter.steps.convert import convert
from deltacat.compute.converter.model.convert_input import ConvertInput
from deltacat.compute.converter.pyiceberg.overrides import (
fetch_all_bucket_files,
parquet_files_dict_to_iceberg_data_files,
)
from deltacat.compute.converter.utils.converter_session_utils import (
check_data_files_sequence_number,
)
from deltacat.compute.converter.pyiceberg.replace_snapshot import (
commit_overwrite_snapshot,
)
from deltacat.compute.converter.pyiceberg.catalog import load_table

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def converter_session(params: ConverterSessionParams, **kwargs):
"""
Convert equality delete to position delete.
Compute and memory heavy work from downloading equality delete table and compute position deletes
will be executed on Ray remote tasks.
"""

catalog = params.catalog
table_name = params.iceberg_table_name
iceberg_table = load_table(catalog, table_name)
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
iceberg_table
)

# files_for_each_bucket contains the following files list:
# {partition_value: [(equality_delete_files_list, data_files_list, pos_delete_files_list)]
files_for_each_bucket = defaultdict(tuple)
for k, v in data_file_dict.items():
logger.info(f"data_file: k, v:{k, v}")
for k, v in equality_delete_dict.items():
logger.info(f"equality_delete_file: k, v:{k, v}")
for partition_value, equality_delete_file_list in equality_delete_dict.items():
(
result_equality_delete_file,
result_data_file,
) = check_data_files_sequence_number(
data_files_list=data_file_dict[partition_value],
equality_delete_files_list=equality_delete_dict[partition_value],
)
logger.info(f"result_data_file:{result_data_file}")
logger.info(f"result_equality_delete_file:{result_equality_delete_file}")
files_for_each_bucket[partition_value] = (
result_data_file,
result_equality_delete_file,
[],
)

iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
print(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}")
merge_keys = params.merge_keys
# Using table identifier fields as merge keys if merge keys not provided
if not merge_keys:
identifier_fields_set = iceberg_table.schema().identifier_field_names()
identifier_fields = list(identifier_fields_set)
else:
identifier_fields = merge_keys
if len(identifier_fields) > 1:
raise NotImplementedError(
f"Multiple identifier fields lookup not supported yet."
)
convert_options_provider = functools.partial(
task_resource_options_provider,
resource_amount_provider=convert_resource_options_provider,
)

# TODO (zyiqin): max_parallel_data_file_download should be determined by memory requirement for each bucket.
# Specifically, for case when files for one bucket memory requirement exceed one worker node's memory limit, WITHOUT rebasing with larger hash bucket count,
# 1. We can control parallel files to download by adjusting max_parallel_data_file_download.
# 2. Implement two-layer converter tasks, with convert tasks to spin up child convert tasks.
# Note that approach 2 will ideally require shared object store to avoid download equality delete files * number of child tasks times.
max_parallel_data_file_download = DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD

compact_small_files = params.compact_small_files
position_delete_for_multiple_data_files = (
params.position_delete_for_multiple_data_files
)
task_max_parallelism = params.task_max_parallelism

def convert_input_provider(index, item):
return {
"convert_input": ConvertInput.of(
files_for_each_bucket=item,
convert_task_index=index,
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
identifier_fields=identifier_fields,
compact_small_files=compact_small_files,
position_delete_for_multiple_data_files=position_delete_for_multiple_data_files,
max_parallel_data_file_download=max_parallel_data_file_download,
)
}

# Ray remote task: convert
# Assuming that memory consume by each bucket doesn't exceed one node's memory limit.
# TODO: Add split mechanism to split large buckets
convert_tasks_pending = invoke_parallel(
items=files_for_each_bucket.items(),
ray_task=convert,
max_parallelism=task_max_parallelism,
options_provider=convert_options_provider,
kwargs_provider=convert_input_provider,
)
to_be_deleted_files_list = []
to_be_added_files_dict_list = []
convert_results = ray.get(convert_tasks_pending)
for convert_result in convert_results:
to_be_deleted_files_list.extend(convert_result[0].values())
to_be_added_files_dict_list.append(convert_result[1])

new_position_delete_files = parquet_files_dict_to_iceberg_data_files(
io=iceberg_table.io,
table_metadata=iceberg_table.metadata,
files_dict_list=to_be_added_files_dict_list,
)
commit_overwrite_snapshot(
iceberg_table=iceberg_table,
# equality_delete_files + data file that all rows are deleted
to_be_deleted_files_list=to_be_deleted_files_list[0],
new_position_delete_files=new_position_delete_files,
)
Empty file.
55 changes: 55 additions & 0 deletions deltacat/compute/converter/model/convert_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations
from typing import Dict, List


class ConvertInput(Dict):
@staticmethod
def of(
files_for_each_bucket,
convert_task_index,
iceberg_warehouse_bucket_name,
identifier_fields,
compact_small_files,
position_delete_for_multiple_data_files,
max_parallel_data_file_download,
) -> ConvertInput:

result = ConvertInput()
result["files_for_each_bucket"] = files_for_each_bucket
result["convert_task_index"] = convert_task_index
result["identifier_fields"] = identifier_fields
result["iceberg_warehouse_bucket_name"] = iceberg_warehouse_bucket_name
result["compact_small_files"] = compact_small_files
result[
"position_delete_for_multiple_data_files"
] = position_delete_for_multiple_data_files
result["max_parallel_data_file_download"] = max_parallel_data_file_download
return result

@property
def files_for_each_bucket(self) -> tuple:
return self["files_for_each_bucket"]

@property
def identifier_fields(self) -> List[str]:
return self["identifier_fields"]

@property
def convert_task_index(self) -> int:
return self["convert_task_index"]

@property
def iceberg_warehouse_bucket_name(self) -> str:
return self["iceberg_warehouse_bucket_name"]

@property
def compact_small_files(self) -> bool:
return self["compact_small_files"]

@property
def position_delete_for_multiple_data_files(self) -> bool:
return self["position_delete_for_multiple_data_files"]

@property
def max_parallel_data_file_download(self) -> int:
return self["max_parallel_data_file_download"]
81 changes: 81 additions & 0 deletions deltacat/compute/converter/model/converter_session_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations
from typing import Optional, Dict
from deltacat.compute.converter.constants import DEFAULT_CONVERTER_TASK_MAX_PARALLELISM


class ConverterSessionParams(dict):
"""
This class represents the parameters passed to convert_ (deltacat/compute/compactor/compaction_session.py)
"""

@staticmethod
def of(params: Optional[Dict]) -> ConverterSessionParams:
params = {} if params is None else params
assert params.get("catalog") is not None, "catalog is a required arg"
assert (
params.get("iceberg_table_name") is not None
), "iceberg_table_name is a required arg"
assert (
params.get("iceberg_warehouse_bucket_name") is not None
), "iceberg_warehouse_bucket_name is a required arg"
result = ConverterSessionParams(params)

result.compact_small_files = params.get("compact_small_files", False)

# For Iceberg v3 spec, option to produce delete vector that can establish 1:1 mapping with data files.
result.position_delete_for_multiple_data_files = params.get(
"position_delete_for_multiple_data_files", True
)
result.task_max_parallelism = params.get(
"task_max_parallelism", DEFAULT_CONVERTER_TASK_MAX_PARALLELISM
)
result.merge_keys = params.get("merge_keys", None)
return result

@property
def catalog(self):
return self["catalog"]

@property
def iceberg_table_name(self) -> str:
return self["iceberg_table_name"]

@property
def iceberg_warehouse_bucket_name(self) -> str:
return self["iceberg_warehouse_bucket_name"]

@property
def compact_small_files(self) -> bool:
return self["compact_small_files"]

@compact_small_files.setter
def compact_small_files(self, compact_small_files) -> None:
self["compact_small_files"] = compact_small_files

@property
def position_delete_for_multiple_data_files(self) -> bool:
return self["position_delete_for_multiple_data_files"]

@position_delete_for_multiple_data_files.setter
def position_delete_for_multiple_data_files(
self, position_delete_for_multiple_data_files
) -> None:
self[
"position_delete_for_multiple_data_files"
] = position_delete_for_multiple_data_files

@property
def task_max_parallelism(self) -> str:
return self["task_max_parallelism"]

@task_max_parallelism.setter
def task_max_parallelism(self, task_max_parallelism) -> None:
self["task_max_parallelism"] = task_max_parallelism

@property
def merge_keys(self) -> str:
return self["merge_keys"]

@merge_keys.setter
def merge_keys(self, merge_keys) -> None:
self["merge_keys"] = merge_keys
Empty file.
75 changes: 75 additions & 0 deletions deltacat/compute/converter/pyiceberg/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Optional


def load_catalog(iceberg_catalog_name, iceberg_catalog_properties):
catalog = load_catalog(
name=iceberg_catalog_name,
**iceberg_catalog_properties,
)
return catalog


def get_s3_path(
bucket_name: str,
database_name: Optional[str] = None,
table_name: Optional[str] = None,
) -> str:
result_path = f"s3://{bucket_name}"
if database_name is not None:
result_path += f"/{database_name}.db"

if table_name is not None:
result_path += f"/{table_name}"
return result_path


def get_bucket_name():
return "metadata-py4j-zyiqin1"


def get_s3_prefix():
return get_s3_path(get_bucket_name())


def get_credential():
import boto3

boto3_session = boto3.Session()
credentials = boto3_session.get_credentials()
return credentials


def get_glue_catalog():
from pyiceberg.catalog import load_catalog

credential = get_credential()
# Credentials are refreshable, so accessing your access key / secret key
# separately can lead to a race condition. Use this to get an actual matched
# set.
credential = credential.get_frozen_credentials()
access_key_id = credential.access_key
secret_access_key = credential.secret_key
session_token = credential.token
s3_path = get_s3_prefix()
glue_catalog = load_catalog(
"glue",
**{
"warehouse": s3_path,
"type": "glue",
"aws_access_key_id": access_key_id,
"aws_secret_access_key": secret_access_key,
"aws_session_token": session_token,
"region_name": "us-east-1",
"s3.access-key-id": access_key_id,
"s3.secret-access-key": secret_access_key,
"s3.session-token": session_token,
"s3.region": "us-east-1",
},
)

return glue_catalog


def load_table(catalog, table_name):
loaded_table = catalog.load_table(table_name)
return loaded_table
Loading

0 comments on commit 9d15ee5

Please sign in to comment.