-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Tests passing] [2.0] Add initial eq-to-pos delete job #356
Conversation
1c30d06
to
3d5149d
Compare
First version of converter with test to verify correctness working here.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting this full implementation together. Great work so far. Couple of things I think would be useful here:
- Modularize all the invocations to catalog client so that we can independently write unit tests for it, and change it when internal catalog support is available.
- I would not compare the hashes here. Although the probability is low, collisions can theoretically occur and we cannot detect/recover them.
- We can add e2e functional tests. I see only one sanity test though.
- Would it be simpler to have a separate package for this implementation and use deltacat as a dependency in that package as there is only one way dependency? I fear we may create high coupled functions overtime making the maintenance (with DeltaCAT 2.0) of deltacat harder.
- Move all the short term hacks like the overrides into
_private
module to emphasize danger in taking any dependency on those functions.
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
data_file_table["primarykey"], | ||
equality_delete_table["primarykey"], | ||
) | ||
positional_delete_table = data_file_table.filter(equality_deletes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a table we get after filtering all the rows matching equality delete values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting a naming changes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.. The name doesn't reflect what it really is.
) | ||
|
||
from deltacat.utils.daft import _get_s3_io_config | ||
# TODO: Use Daft SHA1 hash instead to minimize probably of data corruption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we use sha1, we are at the mercy of probability. Although chances are low, it can happen and cause correctness issues. I don't think we should be using any kind of hashing here to check for equality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using SHA-1 is still the right choice here to keep memory requirements predictable. Although a probability of collision exists, some probability of data corruption always exists that may be outside of your control (beyond perhaps eventually consistent detection and rectification mechanisms).
In this case, the chance of SHA-1 collision is likely lower than the probability of introducing corrupt results anyways due to, say, writing back results from memory corrupted due to a hardware failure (e.g., due to the non-zero frequency of these types of errors observed while running compaction and similar jobs at scale internally at Amazon over the past year).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to choose a middle-ground approach, perhaps you should choose a record count at which the probability of collision is appreciably high-enough to necessitate switching (e.g., perhaps in the septillions of records, assuming that we ever manage datasets that get there).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for continue using SHA1 as hashing approach here.
For your @raghumdani concern of using SHA1 for primary key lookup, we have 50% collision probability of collision across 1.2 * 10 ^21 records using birthday problem, (which is 1.2 sextillion records in ONE bucket, which is not a reasonable expectation of our current data volume). So I don’t think that’ll be a legit concern for us in the near future.
The pros for using Hashing of primary keys are:
- Simplify memory estimation logic
- Avoid OOM
- Efficiency of cluster usage, no need to take into variable length string primary keys.
- Avoid error in resource estimation, since we don’t need to do file sampling job, which any error in file sampling can cause actual job with SLA expectation to fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To my mind, correctness is the deal-breaker here however low the probability is. We can figure out how to keep memory requirements stable as a separate problem for which there is already a working implementation in deltacat. As a side note, we have never seen nor we will see writing back corrupted results due to hardware failures as we have checksums in parquet and S3 performs integrity check on multipart uploads by default. However, we have had issues corrupting RCFs due to code bugs we had introduced. In the current compactor, this is already a risk. From the get go, we have it disabled for majority of tables and have recently introduced this env variable SHA1_HASHING_FOR_MEMORY_OPTIMIZATION_DISABLED
to disable it for all the tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we implement the option to toggle instead of creating a TODO? I believe it's not too much of effort (just don't call hash() method on line 185). This PR already has a lot of tech debt and we want to avoid creating more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just about not calling hash() right? It changes how you estimate your memory resources depend on what method you're using, since you'll get variable length string primary key if you get rid of hash right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, that part is buggy even in the current state as daft would end up reading entire pk column since you are not using the streaming reader. You already have a TODO for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of this conversation is that not the entire pk column will be kept in memory:
Miranda Zhu
Nov 15th, 2024 at 9:40 AM
I wonder If there is a way to apply the UDF while downloading the columns?
Specifically, we’d like to only keep the columns with UDF applied in-memory but discard the origin columns, that could be really useful to us too
Jay Chia
The new execution engine should apply these in a pipelined fashion, so yes it would happen automatically if you did something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, you can add a TODO.
0550d6b
to
7024222
Compare
7024222
to
b6bda23
Compare
b6bda23
to
79801d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks better than previous version. Please address few more comments and ensure the GitHub checks are passing.
deltacat/compute/converter/dev/example_single_merge_key_converter.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/dev/example_single_merge_key_converter.py
Outdated
Show resolved
Hide resolved
table_metadata=iceberg_table.metadata, | ||
files_dict_list=to_be_added_files_dict_list, | ||
) | ||
commit_overwrite_snapshot( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have missed this in first review. We need to gracefully handle any error from commit conflicts which will be resolved by a new job run.
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
deltacat/compute/converter/equality_delete_to_position_delete_session.py
Outdated
Show resolved
Hide resolved
def catalog(self): | ||
return self["catalog"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enrich typing
@@ -0,0 +1,90 @@ | |||
from typing import Optional, Dict | |||
from deltacat.exceptions import RetryableError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I have assumed you've done the due diligence to ensure the estimation is accurate. Not going into greater depth on this business logic.
import s3fs | ||
|
||
|
||
def get_credential(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have seen this method duplicated at multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conditional approval provided all the TODOs are taken as fast follow ups.
Converter to-be implemented features list, tracking here for future PR references. P0. Multiple identifier columns, column concatenating + relevant memory estimation change P1. Currently, Assuming 1 node can fit one hash bucket for now, adjust parallel data file to download in convert function. |
cf14f41
to
013e2f4
Compare
Merged in PR as all checks have passed. |
Tracked in issue. |
For getting overall high-level feedback purpose.