You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I followed the tutorial to compile my own libatgxnativeudfjni.so which contains two UDFs, retrieve_rs_id and retrieve_positive_genotype defined by myself. With this .so file, I can execute my UDFs with Spark Rapids 23.10.0. However, after recently I upgraded my Spark Rapids to 24.10.0-SNAPSHOT build locally, my UDFs can not be executed anymore.
It starts to throw this ERROR message when I execute the following command.
SELECT default.retrieve_rs_id(csq.Existing_variation) FROM vcf LATERAL VIEW explode(vcf.INFO_CSQ)
where I have confirmed that csq.Existing_variation has type LIST of STRING.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (RetrieveRsID: (array<string>) => array<string>)
at com.nvidia.spark.rapids.GpuUserDefinedFunction.$anonfun$columnarEval$4(GpuUserDefinedFunction.scala:72)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at com.nvidia.spark.rapids.GpuUserDefinedFunction.$anonfun$columnarEval$2(GpuUserDefinedFunction.scala:59)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:48)
at com.nvidia.spark.rapids.GpuUserDefinedFunction.columnarEval(GpuUserDefinedFunction.scala:57)
at com.nvidia.spark.rapids.GpuUserDefinedFunction.columnarEval$(GpuUserDefinedFunction.scala:55)
at org.apache.spark.sql.hive.rapids.GpuHiveGenericUDF.columnarEval(hiveUDFs.scala:60)
at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:34)
at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:34)
at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:110)
at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:34)
at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:110)
at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:220)
at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:217)
at scala.collection.immutable.List.foreach(List.scala:431)
at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:217)
at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:252)
at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:110)
at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$project$2(basicPhysicalOperators.scala:615)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at com.nvidia.spark.rapids.GpuTieredProject.recurse$2(basicPhysicalOperators.scala:614)
at com.nvidia.spark.rapids.GpuTieredProject.project(basicPhysicalOperators.scala:627)
at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$5(basicPhysicalOperators.scala:563)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:272)
at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$4(basicPhysicalOperators.scala:563)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$3(basicPhysicalOperators.scala:561)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$NoInputSpliterator.next(RmmRapidsRetryIterator.scala:395)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:185)
at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$1(basicPhysicalOperators.scala:561)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:39)
at com.nvidia.spark.rapids.GpuTieredProject.projectWithRetrySingleBatchInternal(basicPhysicalOperators.scala:558)
at com.nvidia.spark.rapids.GpuTieredProject.projectAndCloseWithRetrySingleBatch(basicPhysicalOperators.scala:597)
at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$2(basicPhysicalOperators.scala:380)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$1(basicPhysicalOperators.scala:376)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$3(GpuColumnarToRowExec.scala:290)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:287)
at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:257)
at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:304)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.IllegalArgumentException: inputs not list columns
at com.atgenomix.seqslab.piper.plugin.atgenomix.udf.hive.RetrieveRsID.retrieveRsID(Native Method)
at com.atgenomix.seqslab.piper.plugin.atgenomix.udf.hive.RetrieveRsID.evaluateColumnar(RetrieveRsID.java:75)
at com.nvidia.spark.rapids.GpuUserDefinedFunction.$anonfun$columnarEval$4(GpuUserDefinedFunction.scala:61)
... 60 more
and my RetrieveRsIDJni.cpp is started with
/**
* @brief The native implementation of RetrieveRsID.retrieveRsID which
* filter the LIST(STRING) columns as a LIST(STRING) result.
*
* @param env The Java environment
* @param j_view The address of the cudf column view of the first LIST column
* @return The address of the cudf column containing the LIST results
*/
JNIEXPORT jlong JNICALL
Java_com_atgenomix_seqslab_piper_plugin_atgenomix_udf_hive_RetrieveRsID_retrieveRsID(JNIEnv* env, jclass, jlong j_view) {
// Use a try block to translate C++ exceptions into Java exceptions to avoid
// crashing the JVM if a C++ exception occurs.
try {
// turn the addresses into column_view pointers
auto v = reinterpret_cast<cudf::column_view const*>(j_view);
if (v->type().id() != cudf::type_id::LIST) {
throw_java_exception(env, ILLEGAL_ARG_CLASS, "inputs not list columns");
return 0;
}
which I mainly take CosineSimilarityJni.cpp as a reference.
The CMakeLists you link to is compiled against libcudf 24.08 (GIT_TAG branch-24.08), but it sounds like you're running against 24.10.0-SNAPSHOT. It's important to compile the RAPIDS Accelerated UDFs against the same libcudf version used at runtime, as there's no guarantee of backwards compatibility across versions. I'm not 100% convinced that is the root cause here, but mixing versions between compile and runtime can cause very odd issues.
Since this is a SNAPSHOT build it will be a bit trickier to get the correct version since it doesn't have a nice, stable tag to reference. If you cannot use 24.08 instead and need 24.10.0-SNAPSHOT, you can examine the contents of the cudf-java-version-info.properties file within the rapids4spark jar you are using to see what git hash was used for cudf. You can then update the GIT_TAG in your CMakeLists for the cudf dependency to pick up that same version you're running with.
If that does not resolve the issue, it would help if we had more visibility into what type the native UDF did receive. Maybe that will give us a clue as to what's going wrong. For example we could change the error code to something like this:
if (v->type().id() != cudf::type_id::LIST) {
std::stringstream ss;
ss << "expected LIST found type ID" << static_cast<int>(v->type().id());
throw_java_exception(env, ILLEGAL_ARG_CLASS, ss.str().c_str());
return0;
}
I followed the tutorial to compile my own libatgxnativeudfjni.so which contains two UDFs, retrieve_rs_id and retrieve_positive_genotype defined by myself. With this .so file, I can execute my UDFs with Spark Rapids 23.10.0. However, after recently I upgraded my Spark Rapids to 24.10.0-SNAPSHOT build locally, my UDFs can not be executed anymore.
It starts to throw this ERROR message when I execute the following command.
where I have confirmed that csq.Existing_variation has type LIST of STRING.
and my RetrieveRsIDJni.cpp is started with
which I mainly take CosineSimilarityJni.cpp as a reference.
What could be the root cause that make my function no more able to be executed ?
I only modified CMakeLists.txt file with newer
https://github.com/NVIDIA/spark-rapids-examples/blob/main/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/CMakeLists.txt
The text was updated successfully, but these errors were encountered: