Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tests passing] [2.0] Add initial eq-to-pos delete job #356

Merged
merged 5 commits into from
Jan 27, 2025

Conversation

Zyiqin-Miranda
Copy link
Member

For getting overall high-level feedback purpose.

@Zyiqin-Miranda Zyiqin-Miranda force-pushed the equality-to-position-job-session branch from 1c30d06 to 3d5149d Compare November 6, 2024 20:03
@Zyiqin-Miranda
Copy link
Member Author

First version of converter with test to verify correctness working here.
For easier review, an overview of the converter currently:

  1. Fetch all equality deletes, data files, previous position deletes in one for loop that having partition value as key here
  2. For each buckets' files, we have file sequence number (similar to storage layer stream_position) attached, and ONLY fetch relevant data files with equality delete files here.
    By relevant, refer to Iceberg spec, specifically:
    An equality delete file must be applied to a data file when all of the following are true:
    The data file's data sequence number is strictly less than the delete's data sequence number
    The data file's partition (both spec id and partition values) is equal [4] to the delete file's partition or the delete file's partition spec is unpartitioned
  3. Convert remote function will use Daft native reader to only read hash value of merge key columns (primary key columns), append file_path, row_index and use zero-copy pyarrow is_in, filter function to find the pos to delete
  4. Upload to S3 with new pos delete files and commit a overwrite (replace not supported yet) snapshot here

@Zyiqin-Miranda Zyiqin-Miranda marked this pull request as ready for review January 2, 2025 18:51
Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this full implementation together. Great work so far. Couple of things I think would be useful here:

  1. Modularize all the invocations to catalog client so that we can independently write unit tests for it, and change it when internal catalog support is available.
  2. I would not compare the hashes here. Although the probability is low, collisions can theoretically occur and we cannot detect/recover them.
  3. We can add e2e functional tests. I see only one sanity test though.
  4. Would it be simpler to have a separate package for this implementation and use deltacat as a dependency in that package as there is only one way dependency? I fear we may create high coupled functions overtime making the maintenance (with DeltaCAT 2.0) of deltacat harder.
  5. Move all the short term hacks like the overrides into _private module to emphasize danger in taking any dependency on those functions.

deltacat/compute/compactor/utils/system_columns.py Outdated Show resolved Hide resolved
deltacat/compute/converter/steps/convert.py Outdated Show resolved Hide resolved
deltacat/compute/converter/steps/convert.py Outdated Show resolved Hide resolved
data_file_table["primarykey"],
equality_delete_table["primarykey"],
)
positional_delete_table = data_file_table.filter(equality_deletes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a table we get after filtering all the rows matching equality delete values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting a naming changes here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. The name doesn't reflect what it really is.

)

from deltacat.utils.daft import _get_s3_io_config
# TODO: Use Daft SHA1 hash instead to minimize probably of data corruption
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we use sha1, we are at the mercy of probability. Although chances are low, it can happen and cause correctness issues. I don't think we should be using any kind of hashing here to check for equality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using SHA-1 is still the right choice here to keep memory requirements predictable. Although a probability of collision exists, some probability of data corruption always exists that may be outside of your control (beyond perhaps eventually consistent detection and rectification mechanisms).

In this case, the chance of SHA-1 collision is likely lower than the probability of introducing corrupt results anyways due to, say, writing back results from memory corrupted due to a hardware failure (e.g., due to the non-zero frequency of these types of errors observed while running compaction and similar jobs at scale internally at Amazon over the past year).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to choose a middle-ground approach, perhaps you should choose a record count at which the probability of collision is appreciably high-enough to necessitate switching (e.g., perhaps in the septillions of records, assuming that we ever manage datasets that get there).

Copy link
Member Author

@Zyiqin-Miranda Zyiqin-Miranda Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for continue using SHA1 as hashing approach here.
For your @raghumdani concern of using SHA1 for primary key lookup, we have 50% collision probability of collision across 1.2 * 10 ^21 records using birthday problem, (which is 1.2 sextillion records in ONE bucket, which is not a reasonable expectation of our current data volume). So I don’t think that’ll be a legit concern for us in the near future.
The pros for using Hashing of primary keys are:

  1. Simplify memory estimation logic
  2. Avoid OOM
  3. Efficiency of cluster usage, no need to take into variable length string primary keys.
  4. Avoid error in resource estimation, since we don’t need to do file sampling job, which any error in file sampling can cause actual job with SLA expectation to fail

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my mind, correctness is the deal-breaker here however low the probability is. We can figure out how to keep memory requirements stable as a separate problem for which there is already a working implementation in deltacat. As a side note, we have never seen nor we will see writing back corrupted results due to hardware failures as we have checksums in parquet and S3 performs integrity check on multipart uploads by default. However, we have had issues corrupting RCFs due to code bugs we had introduced. In the current compactor, this is already a risk. From the get go, we have it disabled for majority of tables and have recently introduced this env variable SHA1_HASHING_FOR_MEMORY_OPTIMIZATION_DISABLED to disable it for all the tables.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement the option to toggle instead of creating a TODO? I believe it's not too much of effort (just don't call hash() method on line 185). This PR already has a lot of tech debt and we want to avoid creating more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just about not calling hash() right? It changes how you estimate your memory resources depend on what method you're using, since you'll get variable length string primary key if you get rid of hash right?

Copy link
Collaborator

@raghumdani raghumdani Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, that part is buggy even in the current state as daft would end up reading entire pk column since you are not using the streaming reader. You already have a TODO for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of this conversation is that not the entire pk column will be kept in memory:

Miranda Zhu
Nov 15th, 2024 at 9:40 AM
I wonder If there is a way to apply the UDF while downloading the columns?
Specifically, we’d like to only keep the columns with UDF applied in-memory but discard the origin columns, that could be really useful to us too
Jay Chia
The new execution engine should apply these in a pipelined fashion, so yes it would happen automatically if you did something like:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, you can add a TODO.

deltacat/compute/converter/utils/s3u.py Show resolved Hide resolved
@Zyiqin-Miranda Zyiqin-Miranda force-pushed the equality-to-position-job-session branch 9 times, most recently from 0550d6b to 7024222 Compare January 19, 2025 06:28
@Zyiqin-Miranda Zyiqin-Miranda changed the title [WIP] Add eq-to-pos delete job session draft [Tests passing] Add eq-to-pos delete job session draft Jan 19, 2025
@Zyiqin-Miranda Zyiqin-Miranda changed the base branch from main to 2.0 January 19, 2025 06:53
@Zyiqin-Miranda Zyiqin-Miranda force-pushed the equality-to-position-job-session branch from 7024222 to b6bda23 Compare January 19, 2025 07:10
@Zyiqin-Miranda Zyiqin-Miranda changed the title [Tests passing] Add eq-to-pos delete job session draft [Tests passing] Add initial eq-to-pos delete job Jan 19, 2025
@Zyiqin-Miranda Zyiqin-Miranda changed the title [Tests passing] Add initial eq-to-pos delete job [Tests passing] [2.0] Add initial eq-to-pos delete job Jan 20, 2025
@Zyiqin-Miranda Zyiqin-Miranda force-pushed the equality-to-position-job-session branch from b6bda23 to 79801d1 Compare January 21, 2025 23:59
Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, looks better than previous version. Please address few more comments and ensure the GitHub checks are passing.

table_metadata=iceberg_table.metadata,
files_dict_list=to_be_added_files_dict_list,
)
commit_overwrite_snapshot(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have missed this in first review. We need to gracefully handle any error from commit conflicts which will be resolved by a new job run.

deltacat/compute/converter/model/convert_session_params.py Outdated Show resolved Hide resolved
Comment on lines 34 to 35
def catalog(self):
return self["catalog"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we enrich typing

@@ -0,0 +1,90 @@
from typing import Optional, Dict
from deltacat.exceptions import RetryableError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I have assumed you've done the due diligence to ensure the estimation is accurate. Not going into greater depth on this business logic.

import s3fs


def get_credential():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen this method duplicated at multiple places.

deltacat/compute/converter/utils/s3u.py Show resolved Hide resolved
Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conditional approval provided all the TODOs are taken as fast follow ups.

@Zyiqin-Miranda
Copy link
Member Author

Zyiqin-Miranda commented Jan 25, 2025

Converter to-be implemented features list, tracking here for future PR references.

P0. Multiple identifier columns, column concatenating + relevant memory estimation change
P0. Verify pos delete written out can be read by Spark, probably done in unit test setup using 2.0 docker
P0. Switch to construct equality delete tables using Spark, probably done in a unit test using 2.0 docker
P0. Any model changes we might need for new 2.0 storage model. eg, only convert certain partition, read “delta”, etc.
P0. Daft sha1 hash support.

P1. Currently, Assuming 1 node can fit one hash bucket for now, adjust parallel data file to download in convert function.
P1. Investigate Pyiceberg replace snapshot committing. Currently, replace snapshot committing self-implemented is not working as expected. Definition of correct should be that we’re able to read the REPLACE snapshot using Spark. So currently reuse the OVERWRITE snapshot committing strategy from Pyiceberg.
P1. Investigate replace snapshot committing using_starting_sequence to avoid conflict. Not entirely sure we need this given that some workaround maybe feasible through internal catalog implementation. So deprioritize for P1.
P1. Merge/Compact small pos delete files support
P1. Spark read pos delete performace. Position delete can correctly be matched to corresponding data files by setting lower_bounds==upper_bounds==file_path even with multiple data files. It’s not scanning whole partition pos delete into memory when trying to merge-on-read.

@Zyiqin-Miranda Zyiqin-Miranda force-pushed the equality-to-position-job-session branch from cf14f41 to 013e2f4 Compare January 27, 2025 04:37
@Zyiqin-Miranda Zyiqin-Miranda merged commit 9d15ee5 into 2.0 Jan 27, 2025
3 checks passed
@Zyiqin-Miranda
Copy link
Member Author

Merged in PR as all checks have passed.

@Zyiqin-Miranda
Copy link
Member Author

Tracked in issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants