Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21075] Support DP 2.2(spark 3.5.1) by checking if the new method is present or not. #15808

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

sahusanket
Copy link
Contributor

@sahusanket sahusanket commented Jan 16, 2025

We need to support both Dataproc 2.2 ( spark 3.5.1) and 2.1 (spark 3.3.2).

There were few refactoring and breaking API changes in SPARK-44531 .


Changed the spark version in CDAP to 3.5.1 just to check compilation failures and it resulted in only 1 Error in out code base i.e. OpaqueDatasetCollection#toDataframeCollection

In which we were using RowEncoder.apply(StructType) which is no longer present.

Also testing with DP 2.2 with a suitable pipeline (Dedup --> Join) gives java.lang.NoSuchMethodError for that code path, rest of the testing went fine.


Based on above points, we can say this is the only code path that requires change.


Fix in this pr:
Using reflection we are first determining if new method is present, if so , it is Spark 3.5.1 and we use the following code via reflection to achieve the Encoder :

AgnosticEncoder ae = RowEncoder.encoderFor(schema);
//Convert to expression
return ExpressionEncoder.apply(ae);

@sahusanket sahusanket self-assigned this Jan 16, 2025
@sahusanket sahusanket added the build Triggers github actions build label Jan 16, 2025
@sahusanket sahusanket force-pushed the CDAP-21075_spark_reflection branch from cec4db9 to 910ad98 Compare January 16, 2025 15:11
@sahusanket sahusanket requested a review from tivv January 16, 2025 15:11
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
0.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

* Here we are trying to check if the new method introduced in 3.5.1 exists or not, and based on that we
* invoke the new method or the old one.
*/
public static ExpressionEncoder<Row> getRowEncoder(StructType sparkSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please cache the Method objects retrieved? Instrospection is a pretty heavy operation, we can do it once and then simply do "invoke".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Triggers github actions build
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants