From 910ad98d7f4673169013efd9eea15c4b196a9e9e Mon Sep 17 00:00:00 2001 From: sahusanket Date: Thu, 16 Jan 2025 20:30:09 +0530 Subject: [PATCH] [CDAP-21075] Support DP 2.2(spark 3.5.1) by checking if the new method is present or not. --- .../spark/batch/OpaqueDatasetCollection.java | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java index 371bbef134e..85287bca15c 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/OpaqueDatasetCollection.java @@ -26,6 +26,8 @@ import io.cdap.cdap.etl.spark.function.FunctionCache; import io.cdap.cdap.etl.spark.join.JoinExpressionRequest; import io.cdap.cdap.etl.spark.join.JoinRequest; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; @@ -108,12 +110,44 @@ public SparkCollection join(JoinExpressionRequest joinRequest) { @Override public DataframeCollection toDataframeCollection(Schema schema) { StructType sparkSchema = DataFrames.toDataType(schema); - ExpressionEncoder encoder = RowEncoder.apply(sparkSchema); + ExpressionEncoder encoder = getRowEncoder(sparkSchema); Dataset ds = (Dataset) getDataset(); MapFunction converter = r -> DataFrames.toRow(r, sparkSchema); return new DataframeCollection(schema, ds.map(converter, encoder), sec, jsc, sqlContext, datasetContext, sinkFactory, functionCacheFactory); } + /** + * This is required to handle breaking changes between spark 3.3.2 (Dataproc 2.1) to 3.5.1 (Dataproc 2.2). + * And we need to support both. + * Here we are trying to check if the new method introduced in 3.5.1 exists or not, and based on that we + * invoke the new method or the old one. + */ + public static ExpressionEncoder getRowEncoder(StructType sparkSchema) { + StringBuilder errorStrBuilder = new StringBuilder("Failed to load a suitable Encoder dynamically. Errors : "); + + try { + // Check if the `RowEncoder` class has the `encoderFor` method. This is for spark 3.5 + Method encoderForMethod = RowEncoder.class.getMethod("encoderFor", StructType.class); + Object agnosticEncoderObj = encoderForMethod.invoke(null, sparkSchema); + + Method applyMethod = ExpressionEncoder.class.getMethod("apply", + Class.forName("org.apache.spark.sql.catalyst.encoders.AgnosticEncoder")); + + return (ExpressionEncoder)applyMethod.invoke(null, agnosticEncoderObj); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | ClassNotFoundException e) { + errorStrBuilder.append(System.lineSeparator()).append(e.getMessage()); + } + + try { + // If it reaches here, meaning it should have the older method for 3.3.2 and lower. + Method applyMethod = RowEncoder.class.getMethod("apply", StructType.class); + return (ExpressionEncoder) applyMethod.invoke(null, sparkSchema); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + errorStrBuilder.append(System.lineSeparator()).append(e.getMessage()); + } + + throw new RuntimeException(errorStrBuilder.toString()); + } }