Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-Defined UDF failed after upgrading from Spark Rapids 23.10.0 to 24.08.1 #433

Closed
LIN-Yu-Ting opened this issue Sep 6, 2024 · 2 comments

Comments

@LIN-Yu-Ting
Copy link

LIN-Yu-Ting commented Sep 6, 2024

I followed the tutorial to compile my own libatgxnativeudfjni.so which contains two UDFs, retrieve_rs_id and retrieve_positive_genotype defined by myself. With this .so file, I can execute my UDFs with Spark Rapids 23.10.0. However, after recently I upgraded my Spark Rapids to 24.10.0-SNAPSHOT build locally, my UDFs can not be executed anymore.

It starts to throw this ERROR message when I execute the following command.

SELECT default.retrieve_rs_id(csq.Existing_variation) FROM vcf LATERAL VIEW explode(vcf.INFO_CSQ)

where I have confirmed that csq.Existing_variation has type LIST of STRING.

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (RetrieveRsID: (array<string>) => array<string>)
	at com.nvidia.spark.rapids.GpuUserDefinedFunction.$anonfun$columnarEval$4(GpuUserDefinedFunction.scala:72)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuUserDefinedFunction.$anonfun$columnarEval$2(GpuUserDefinedFunction.scala:59)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:48)
	at com.nvidia.spark.rapids.GpuUserDefinedFunction.columnarEval(GpuUserDefinedFunction.scala:57)
	at com.nvidia.spark.rapids.GpuUserDefinedFunction.columnarEval$(GpuUserDefinedFunction.scala:55)
	at org.apache.spark.sql.hive.rapids.GpuHiveGenericUDF.columnarEval(hiveUDFs.scala:60)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:34)
	at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:34)
	at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:110)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:34)
	at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:110)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:220)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:217)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:217)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:252)
	at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:110)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$project$2(basicPhysicalOperators.scala:615)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuTieredProject.recurse$2(basicPhysicalOperators.scala:614)
	at com.nvidia.spark.rapids.GpuTieredProject.project(basicPhysicalOperators.scala:627)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$5(basicPhysicalOperators.scala:563)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:272)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$4(basicPhysicalOperators.scala:563)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$3(basicPhysicalOperators.scala:561)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$NoInputSpliterator.next(RmmRapidsRetryIterator.scala:395)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:185)
	at com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$1(basicPhysicalOperators.scala:561)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:39)
	at com.nvidia.spark.rapids.GpuTieredProject.projectWithRetrySingleBatchInternal(basicPhysicalOperators.scala:558)
	at com.nvidia.spark.rapids.GpuTieredProject.projectAndCloseWithRetrySingleBatch(basicPhysicalOperators.scala:597)
	at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$2(basicPhysicalOperators.scala:380)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$1(basicPhysicalOperators.scala:376)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$3(GpuColumnarToRowExec.scala:290)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:287)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:257)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:304)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IllegalArgumentException: inputs not list columns
	at com.atgenomix.seqslab.piper.plugin.atgenomix.udf.hive.RetrieveRsID.retrieveRsID(Native Method)
	at com.atgenomix.seqslab.piper.plugin.atgenomix.udf.hive.RetrieveRsID.evaluateColumnar(RetrieveRsID.java:75)
	at com.nvidia.spark.rapids.GpuUserDefinedFunction.$anonfun$columnarEval$4(GpuUserDefinedFunction.scala:61)
	... 60 more

and my RetrieveRsIDJni.cpp is started with

/**
 * @brief The native implementation of RetrieveRsID.retrieveRsID which
 * filter the LIST(STRING) columns as a LIST(STRING) result.
 *
 * @param env The Java environment
 * @param j_view The address of the cudf column view of the first LIST column
 * @return The address of the cudf column containing the LIST results
 */
JNIEXPORT jlong JNICALL
Java_com_atgenomix_seqslab_piper_plugin_atgenomix_udf_hive_RetrieveRsID_retrieveRsID(JNIEnv* env, jclass, jlong j_view) {
  // Use a try block to translate C++ exceptions into Java exceptions to avoid
  // crashing the JVM if a C++ exception occurs.
  try {
    // turn the addresses into column_view pointers
    auto v = reinterpret_cast<cudf::column_view const*>(j_view);
    if (v->type().id() != cudf::type_id::LIST) {
      throw_java_exception(env, ILLEGAL_ARG_CLASS, "inputs not list columns");
      return 0;
    }

which I mainly take CosineSimilarityJni.cpp as a reference.

What could be the root cause that make my function no more able to be executed ?
I only modified CMakeLists.txt file with newer
https://github.com/NVIDIA/spark-rapids-examples/blob/main/examples/UDF-Examples/RAPIDS-accelerated-UDFs/src/main/cpp/CMakeLists.txt

@jlowe
Copy link
Member

jlowe commented Sep 6, 2024

The CMakeLists you link to is compiled against libcudf 24.08 (GIT_TAG branch-24.08), but it sounds like you're running against 24.10.0-SNAPSHOT. It's important to compile the RAPIDS Accelerated UDFs against the same libcudf version used at runtime, as there's no guarantee of backwards compatibility across versions. I'm not 100% convinced that is the root cause here, but mixing versions between compile and runtime can cause very odd issues.

Since this is a SNAPSHOT build it will be a bit trickier to get the correct version since it doesn't have a nice, stable tag to reference. If you cannot use 24.08 instead and need 24.10.0-SNAPSHOT, you can examine the contents of the cudf-java-version-info.properties file within the rapids4spark jar you are using to see what git hash was used for cudf. You can then update the GIT_TAG in your CMakeLists for the cudf dependency to pick up that same version you're running with.

If that does not resolve the issue, it would help if we had more visibility into what type the native UDF did receive. Maybe that will give us a clue as to what's going wrong. For example we could change the error code to something like this:

    if (v->type().id() != cudf::type_id::LIST) {
      std::stringstream ss;
      ss << "expected LIST found type ID" << static_cast<int>(v->type().id());
      throw_java_exception(env, ILLEGAL_ARG_CLASS, ss.str().c_str());
      return 0;
    }

@LIN-Yu-Ting
Copy link
Author

@jlowe Thanks for your comments. After updating CMakeLists.txt with
https://raw.githubusercontent.com/rapidsai/rapids-cmake/branch-24.10/RAPIDS.cmake

and

GIT_REPOSITORY https://github.com/rapidsai/cudf.git
GIT_TAG branch-24.10

I am able to compile our .so file. Thanks so much for your supports.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants