Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for anti-join based reader support for Hive ACID tables in Spark #25

Open
amoghmargoor opened this issue Dec 16, 2019 · 1 comment
Assignees

Comments

@amoghmargoor
Copy link
Collaborator

Currently, ACID reader is currently using Hive ORC Acid readers shaded to read the Hive ACID tables. We can definitely improve the performance if we move to native readers to read those tables. Especially in the case of Parquet, Spark’s native Parquet readers are optimized and we don't want to miss on that. Another reason is that we need to convert every row from Hive Structure to Internal Row which might add penalty too.

Following are the 2 important objectives for this exercise:

Performance: Figure out a performant way of using the native Spark readers.

Maintainability: Not make changes to Spark readers for Hive ACID. We would not like to make any changes to Spark readers or keep a fork of it which needs to be maintained later.

One of the solutions we are planning to evaluate is to use anti-join between base_files + delta files (let’s call it base relation) and on the delete_delta files (let’s call it delete relation). AntiJoin of base and delta relations on (rowId, bucket, transactionIds) can give us the result to read i.e.,

AnitJoin(base, delete)

However, Sort Merge Join can lead to extra shuffle that might cause a performance issue. If delete relation is more than the broadcast threshold then that can lead to SMJ. To ensure Broadcast to happen more frequently is to split the base and delete relations by bucket id and doing anti-join corresponding to the bucket Ids i.e.,

AntiJoin(base, delete) = Union(AntiJoin(base_bucket_1, delete_bucket_1), AntiJoin(base_bucket_2, delete_bucket_2), … AntiJoin(base_bucket_n, delete_bucket_n))

This way joins are getting split and making sure the broadcast joins come into play here.

@amoghmargoor
Copy link
Collaborator Author

Couple of issues here:

  1. One of the issues specific to ORC is that the reader currently understands the ACID format. So if we just do this:

spark.read.format("orc").load(".../warehouse/acidtbl/delta_0000002_0000002_0000/bucket_00000")
following exception is thrown in SchemaEvolution:
if (this.readerIncluded != null && this.readerIncluded.length + this.readerColumnOffset != this.readerSchema.getMaximumId() + 1) {
throw new IllegalArgumentException("Include vector the wrong length: " + this.readerSchema.toJson() + " with include length " + this.readerIncluded.length);
}

This means delta files are not even accessible to normal readers where it can read it like a normal ORC files with normal row ids.

  1. What should be done when rowId has not been determined for base files yet ?

@amoghmargoor amoghmargoor self-assigned this Mar 27, 2020
amoghmargoor pushed a commit to amoghmargoor/spark-acid that referenced this issue Jun 16, 2020
fix: dev: SPAR-4319: Added blobstore commit marker while doing insert overwrite. Also set hive conf when reading from spark.

Approved-by: Amogh Margoor <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant