Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

awswrangler.athena.to_iceberg not supporting to synchronous/parallel lambda instances. #2651

Open
B161851 opened this issue Jan 31, 2024 · 14 comments
Labels
bug Something isn't working

Comments

@B161851
Copy link

B161851 commented Jan 31, 2024

Describe the bug

wr.athena.to_iceberg(
            df=df,
            database='test_database',
            table='my_table2',
            table_location='s3://bucket-testing1/my_table2/',
            temp_path=f's3://bucket-testing1/temp_path/',
            keep_files=True
        )

For parallel writing, If keep_files=True then it is resulting the duplicates and I tried appending the nano timestamp to the temporary path so it's unique but now I have "ICEBERG_COMMIT_ERROR"
If keep_files=False then it is giving "HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error" when ingesting iceberg data in parallel
and we observed if keep_files=False then in that library entire temp_path was removed from the s3 and getting the above error.

It's not supporting to write to the iceberg table using wrangler from lambda.
So, how can we overcome the above issues in lambda parallel writing to iceberg table using awswrangler.

How to Reproduce

wr.athena.to_iceberg(
            df=df,
            database='test_database',
            table='my_table2',
            table_location='s3://bucket-testing1/my_table2/',
            temp_path=f's3://bucket-testing1/temp_path/',
            keep_files=False
        )

we observed if keep_files=False then in that library entire temp_path was removed from the s3 and resulted "HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error"
if you remove the particular parquet file from the temp_path instead of removing entire temp_path from s3, I think might give the above error.

Expected behavior

No response

Your project

No response

Screenshots

No response

OS

Win

Python version

3.8

AWS SDK for pandas version

12

Additional context

No response

@B161851 B161851 added the bug Something isn't working label Jan 31, 2024
@B161851 B161851 closed this as completed Jan 31, 2024
@B161851 B161851 reopened this Jan 31, 2024
@kukushking
Copy link
Contributor

Hi @B161851 , if you are inserting concurrently, you ned to make sure temp_path is unique and empty for each run. Also, when you got ICEBERG_COMMIT_ERROR , did the table you are inserting in exist? It might be a race condition due to multiple runs trying to create a table. Checking.

@Salatich
Copy link

Salatich commented Feb 13, 2024

@kukushking hi, facing with same, even with two concurrent writers (lambdas). Table exists. Trying to perform upsert (MERGE INTO) operation. In my case upsert happens even on diffrent partitions (different parts of a table), so I don't think it's a race condition.

 tmp_table_name = f"my_table_{uuid.uuid4()}".replace("-", "_")
 tmp_path = f"s3://my_bucket/{tmp_table_name}"
 wr.athena.to_iceberg(
                    df=processed_df,
                    database='my_database',
                    table="my_table",
                    table_location="s3://my_bucket/my_table",
                    temp_path=tmp_path,
                    partition_cols=["col1", "col2"],
                    merge_cols=["col1","col2","col3"],
                    keep_files=False
                )

ICEBERG_COMMIT_ERROR: Failed to commit Iceberg update to table

@vibe
Copy link
Contributor

vibe commented Feb 20, 2024

Just wanted to bump this issue up as well.

Particular use case is uses upsert very similar to @Salatich's last comment.
Infrastructure is MSK Trigger on a Lambda.

Have had to lock lambda concurrency to 1 to avoid the ICEBERG_COMMIT_ERROR errors.

Copy link

Marking this issue as stale due to inactivity. This helps our maintainers find and focus on the active issues. If this issue receives no comments in the next 7 days it will automatically be closed.

@Salatich
Copy link

bump

@channingdata
Copy link

bump. addressing this feature will be very helpful

@kukushking
Copy link
Contributor

kukushking commented May 13, 2024

All, looks like this is service-side issue. Please raise a support request.

@ChanTheDataExplorer @Salatich @vibe is it also HIVE_CANNOT_OPEN_SPLIT NoSuchKey or any other exception code? This error code may correspond to a multiple different root causes. Any additional info would be appreciated ie what is the size of your dataset? what is the key that is causing an issue and corresponding data frame? Does this consistently reproduce?

@channingdata
Copy link

in my side it is just ICEBERG_COMMIT_ERROR

@peterklingelhofer
Copy link

peterklingelhofer commented May 21, 2024

We're seeing a lot of ICEBERG_COMMIT_ERROR using the latest awswrangler with Athena and Glue when attempting parallel writes. Changing partition sizes so that no writes are ever merging into the same partition does not alleviate the problem. Little documentation on the error: https://repost.aws/knowledge-center/athena-iceberg-table-error

@Siddharth-Latthe-07
Copy link

When working with parallel writing to an Iceberg table using awswrangler in AWS Lambda, there are some specific considerations and configurations to handle to avoid issues like duplicates, ICEBERG_COMMIT_ERROR, and HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error.
Suggestions to overcome:-

  1. Handling Duplicates
    Appending a nano timestamp to the temporary path can help make each parallel write unique, but it may still lead to ICEBERG_COMMIT_ERROR due to conflicts in committing changes to the table metadata.
  2. Keep Files Configuration
    If keep_files=False, the library removes the entire temp_path, which may cause the HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error when subsequent operations try to access files that no longer exist.

Best approach:-
1.Generate Unique Temporary Paths: Use a unique identifier for each Lambda invocation to ensure that temporary paths do not conflict.
2. Handle File Cleanup Manually: Instead of relying on the keep_files parameter, manage the cleanup of temporary files manually after the write operation is successful.
Sample code snippet:-

import boto3
import awswrangler as wr
import time
import uuid

def write_to_iceberg(df, database, table, table_location, bucket):
    unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
    
    try:
        wr.athena.to_iceberg(
            df=df,
            database=database,
            table=table,
            table_location=table_location,
            temp_path=unique_temp_path,
            keep_files=True  # Keep files to avoid NoSuchKey errors
        )
    finally:
        # Clean up temporary files manually
        s3 = boto3.resource('s3')
        bucket = s3.Bucket(bucket)
        for obj in bucket.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
            s3.Object(bucket.name, obj.key).delete()

# Example usage
df =  # Your DataFrame
write_to_iceberg(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')

Hope this helps, do let me know, if there are similar issues
Thanks

@Acehaidrey
Copy link

@Siddharth-Latthe-07 want to check with you, your recommendations do not solve the ICEBERG_COMMIT_ERROR which in my case each partition is being written to by a different parallel process. Do you have any ideas on how to resolve such case?

Also @peterklingelhofer / @Salatich did you get any such luck to this issue.

@Siddharth-Latthe-07
Copy link

@Acehaidrey Here are some of the additional strategies you can look for:-

  1. Coordination with Commit Locks:- One approach to avoid commit conflicts is to implement a coordination mechanism using locks.
    Sample code:-
import boto3
import awswrangler as wr
import time
import uuid
import json

s3 = boto3.client('s3')

def acquire_lock(lock_key):
    try:
        s3.put_object(Bucket='your-lock-bucket', Key=lock_key, Body='')
        return True
    except Exception as e:
        return False

def release_lock(lock_key):
    s3.delete_object(Bucket='your-lock-bucket', Key=lock_key)

def write_to_iceberg(df, database, table, table_location, bucket):
    unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
    lock_key = f'iceberg_commit_lock/{table}'

    # Try to acquire the lock
    while not acquire_lock(lock_key):
        time.sleep(0.1)  # Wait before retrying

    try:
        wr.athena.to_iceberg(
            df=df,
            database=database,
            table=table,
            table_location=table_location,
            temp_path=unique_temp_path,
            keep_files=True  # Keep files to avoid NoSuchKey errors
        )
    finally:
        # Clean up temporary files manually
        s3_resource = boto3.resource('s3')
        bucket_resource = s3_resource.Bucket(bucket)
        for obj in bucket_resource.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
            s3_resource.Object(bucket_resource.name, obj.key).delete()

        # Release the lock
        release_lock(lock_key)

# Example usage
df =  # Your DataFrame
write_to_iceberg(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')
  1. Using AWS Glue for Coordination:- (Create a Glue job that reads data from S3 and writes to Iceberg.
    Schedule the Glue job or trigger it based on S3 events.)

  2. Use Iceberg's Built-In Conflict Resolution:- Apache Iceberg provides built-in mechanisms to handle conflicts

  3. Implementing Retries with Exponential Backoff:- Add retries with exponential backoff
    sample code:-

import boto3
import awswrangler as wr
import time
import uuid
from botocore.exceptions import ClientError

def exponential_backoff(retries):
    return min(60, (2 ** retries))

def write_to_iceberg_with_retries(df, database, table, table_location, bucket, max_retries=5):
    unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
    retries = 0

    while retries < max_retries:
        try:
            wr.athena.to_iceberg(
                df=df,
                database=database,
                table=table,
                table_location=table_location,
                temp_path=unique_temp_path,
                keep_files=True
            )
            break
        except ClientError as e:
            retries += 1
            time.sleep(exponential_backoff(retries))
            if retries == max_retries:
                raise e
        finally:
            # Clean up temporary files manually
            s3_resource = boto3.resource('s3')
            bucket_resource = s3_resource.Bucket(bucket)
            for obj in bucket_resource.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
                s3_resource.Object(bucket_resource.name, obj.key).delete()

# Example usage
df =  # Your DataFrame
write_to_iceberg_with_retries(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')

Combining unique temporary paths with proper locking mechanisms, using AWS Glue for coordination, leveraging Iceberg's conflict resolution, and implementing retries can help overcome the ICEBERG_COMMIT_ERROR and other issues during parallel writes.
Hope this helps,
Thanks

@Salatich
Copy link

Salatich commented Aug 5, 2024

Hi @Acehaidrey , unfortunately no luck. In my approach - I use unique names for temp_paths and update different partitions (so, there is no race I believe). Also, the table exists, so there is no race condition on creating a table.

I'm using exponential backoff - it kind of helps, but I see retry warnings in my lambdas constantly with ICEBERG_COMMIT_ERROR error.

Also, this statement confuses me (from https://repost.aws/knowledge-center/athena-iceberg-table-error):
image
Does it mean that iceberg doesn't support parallel writes?

@Acehaidrey
Copy link

Thank you @Siddharth-Latthe-07 . Think this will slow down the program indeed. But seems the case is to limit the parallelism? which isnt the solution we want to go towards :/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

9 participants