Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CometRowToColumnar operator #119

Closed
advancedxy opened this issue Feb 27, 2024 · 10 comments · Fixed by #206
Closed

Add CometRowToColumnar operator #119

advancedxy opened this issue Feb 27, 2024 · 10 comments · Fixed by #206
Labels
enhancement New feature or request

Comments

@advancedxy
Copy link
Contributor

What is the problem the feature request solves?

Just tried comet locally with spark-shell, it turns out that comet requires that all the input shall be columnar based. So, it's not possible to test comet with the following code:

spark.range(10, 1000, 10).selectExpr("id", "id + 1 as val").repartition(10, col("id"))

Adding the CometRowToColumnar operator on top of the leaf node(RangeExec) will make it easy to test comet out.

In the long term, I think the CometRowToColumnar could be used to wrap around Spark's row-base source exec node and makes all the following operators columnar-based and leverages comet's columnar execution.

Describe the potential solution

Add CometRowToColumnar operator

Additional context

No response

@advancedxy advancedxy added the enhancement New feature or request label Feb 27, 2024
@advancedxy
Copy link
Contributor Author

cc @sunchao @viirya

@viirya
Copy link
Member

viirya commented Feb 27, 2024

We have thought about this. This is on our plan list. As it is not as urgent as other Spark operators, so it is not in the first batch of native operators we implement.

@advancedxy
Copy link
Contributor Author

As it is not as urgent as other Spark operators

Do you have a list of operators with priority attached to be supported? I think I can help a bit.

For the CometRowToColumnar operator, I can help support it too.

@viirya
Copy link
Member

viirya commented Feb 27, 2024

I think our short term goal will be making all queries in TPCH, TPCDS fully enabled with Comet operators. So these operators found in these queries are with higher priority. We have already implemented most of them, I think. One missing piece is Join operator which I'm working on. It is close to be done as it passed all Comet tests and Spark test but we are still resolving a few test failures on TPCDS.

And not only operator, native expression is also we want to deal with.

We have looked at finding any unsupported ones like unsupported operators, expressions in TPCDS, TPCH queries regarding Comet. I believe that Parth has been working on the list. cc @parthchandra

@sunchao
Copy link
Member

sunchao commented Feb 27, 2024

@viirya feel free to break up the tasks for join when you think it is necessary, to improve the parallelism :) (I'm not sure whether some extra work is required for broadcast join atm).

@advancedxy you can also check the existing operators on the Spark side and see if there are some gaps that we should fill.

There are also a bunch of tasks on the DataFusion side in particular on aggregate and join performances. To name a few:

I think implementing the support of operator is just the start. How to get good performance out of them will also become very important in future.

@sunchao
Copy link
Member

sunchao commented Feb 27, 2024

One thing we don't support is InSubqueryExec, which is used in dynamic partition pruning and other things. We do support ScalarSubqueryExec at the moment.

@viirya
Copy link
Member

viirya commented Feb 27, 2024

I'm not sure whether some extra work is required for broadcast join atm).

For broadcast join, basically we just need hash join + broadcast in Comet. I have hash join draft work. And we already have broadcast. Once we have hash join ready, it should be easy to have broadcast join by combining them.

add spilling for SMJ in DF (@viirya do we have an issue tracking this?)

Just created it: apache/datafusion#9359

@viirya
Copy link
Member

viirya commented Feb 27, 2024

feel free to break up the tasks for join when you think it is necessary, to improve the parallelism

For SortMergeJoin support in Comet, it is a integral one like other working items we finished or are working on, and it makes more sense work on it as whole (except that you want to break it out to serde code, CometSortMergeJoinExec operator class, test, etc. 😂 ).

There are some pre tasks and they are finished, e.g., relaxing join on expression type and adding join filter support.

Improving DataFusion SortMergeJoin could be a separate task as it is orthogonal to the task of adding support in Comet. Although I am not sure where is the performance bottleneck yet, but from the benchmark I ran before compared to Spark, it doesn't have better performance but just similar.

SortMergeJoin spilling support is also another separate task. I created a ticket for that.

@advancedxy
Copy link
Contributor Author

@advancedxy you can also check the existing operators on the Spark side and see if there are some gaps that we should fill.

Good point. Actually I am actively evaluating comet and check whether something works or not. I may create issues if some operators or expressions are not supported yet.

But I do agree with @viirya's point. We should fully support TPCH and TPC-DS queries in the short term and priorities operators and expressions on that for both coverage and performance.

@advancedxy
Copy link
Contributor Author

FYI, I am planning to working on this in the next week.

Of course, if others are also interested in this, you can comment here and take it over, I can help review and/or provide some help if needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants