Skip to content

Commit

Permalink
[CDAP-21075] Support DP 2.2(spark 3.5.1) by checking if the new metho…
Browse files Browse the repository at this point in the history
…d is present or not.
  • Loading branch information
sahusanket committed Jan 16, 2025
1 parent 6fb8647 commit 910ad98
Showing 1 changed file with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.cdap.cdap.etl.spark.function.FunctionCache;
import io.cdap.cdap.etl.spark.join.JoinExpressionRequest;
import io.cdap.cdap.etl.spark.join.JoinRequest;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
Expand Down Expand Up @@ -108,12 +110,44 @@ public SparkCollection<T> join(JoinExpressionRequest joinRequest) {
@Override
public DataframeCollection toDataframeCollection(Schema schema) {
StructType sparkSchema = DataFrames.toDataType(schema);
ExpressionEncoder<Row> encoder = RowEncoder.apply(sparkSchema);
ExpressionEncoder<Row> encoder = getRowEncoder(sparkSchema);
Dataset<StructuredRecord> ds = (Dataset<StructuredRecord>) getDataset();
MapFunction<StructuredRecord, Row> converter = r -> DataFrames.toRow(r, sparkSchema);
return new DataframeCollection(schema, ds.map(converter, encoder),
sec, jsc, sqlContext, datasetContext, sinkFactory, functionCacheFactory);
}

/**
* This is required to handle breaking changes between spark 3.3.2 (Dataproc 2.1) to 3.5.1 (Dataproc 2.2).
* And we need to support both.
* Here we are trying to check if the new method introduced in 3.5.1 exists or not, and based on that we
* invoke the new method or the old one.
*/
public static ExpressionEncoder<Row> getRowEncoder(StructType sparkSchema) {

StringBuilder errorStrBuilder = new StringBuilder("Failed to load a suitable Encoder dynamically. Errors : ");

try {
// Check if the `RowEncoder` class has the `encoderFor` method. This is for spark 3.5
Method encoderForMethod = RowEncoder.class.getMethod("encoderFor", StructType.class);
Object agnosticEncoderObj = encoderForMethod.invoke(null, sparkSchema);

Method applyMethod = ExpressionEncoder.class.getMethod("apply",
Class.forName("org.apache.spark.sql.catalyst.encoders.AgnosticEncoder"));

return (ExpressionEncoder<Row>)applyMethod.invoke(null, agnosticEncoderObj);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | ClassNotFoundException e) {
errorStrBuilder.append(System.lineSeparator()).append(e.getMessage());
}

try {
// If it reaches here, meaning it should have the older method for 3.3.2 and lower.
Method applyMethod = RowEncoder.class.getMethod("apply", StructType.class);
return (ExpressionEncoder<Row>) applyMethod.invoke(null, sparkSchema);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
errorStrBuilder.append(System.lineSeparator()).append(e.getMessage());
}

throw new RuntimeException(errorStrBuilder.toString());
}
}

0 comments on commit 910ad98

Please sign in to comment.