Skip to content

Commit

Permalink
[SPARK-45211][CONNECT] Eliminated ambiguous references in `CloseableI…
Browse files Browse the repository at this point in the history
…terator#apply` to fix Scala 2.13 daily test

### What changes were proposed in this pull request?
This pr eliminated an ambiguous references in `org.apache.spark.sql.connect.client.CloseableIterator#apply` function to make the test case `abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error` can test pass with Scala 2.13.

### Why are the changes needed?
`abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error`  failed in the daily test of Scala 2.13:
- https://github.com/apache/spark/actions/runs/6215331575/job/16868131377

<img width="1190" alt="image" src="https://github.com/apache/spark/assets/1475305/466370fd-15e5-4ffd-9407-2e4a2fc4efe7">

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Actions
- Manual check

run

```
dev/change-scala-version.sh 2.13
build/sbt "connect/testOnly org.apache.spark.sql.connect.execution.ReattachableExecuteSuite" -Pscala-2.13
```

**Before**

```
[info] ReattachableExecuteSuite:
[info] - reattach after initial RPC ends (2 seconds, 258 milliseconds)
[info] - raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error (30 milliseconds)
[info] - raw new RPC interrupts previous RPC with INVALID_CURSOR.DISCONNECTED error (21 milliseconds)
[info] - client INVALID_CURSOR.DISCONNECTED error is retried when rpc sender gets interrupted (602 milliseconds)
[info] - client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this one (637 milliseconds)
[info] - abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error *** FAILED *** (70 milliseconds)
[info]   Expected exception org.apache.spark.SparkException to be thrown, but java.lang.StackOverflowError was thrown (ReattachableExecuteSuite.scala:172)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Assertions.intercept(Assertions.scala:756)
[info]   at org.scalatest.Assertions.intercept$(Assertions.scala:746)
[info]   at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1564)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$18(ReattachableExecuteSuite.scala:172)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$18$adapted(ReattachableExecuteSuite.scala:168)
[info]   at org.apache.spark.sql.connect.SparkConnectServerTest.withCustomBlockingStub(SparkConnectServerTest.scala:222)
[info]   at org.apache.spark.sql.connect.SparkConnectServerTest.withCustomBlockingStub$(SparkConnectServerTest.scala:216)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.withCustomBlockingStub(ReattachableExecuteSuite.scala:30)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$16(ReattachableExecuteSuite.scala:168)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$16$adapted(ReattachableExecuteSuite.scala:151)
[info]   at org.apache.spark.sql.connect.SparkConnectServerTest.withClient(SparkConnectServerTest.scala:199)
[info]   at org.apache.spark.sql.connect.SparkConnectServerTest.withClient$(SparkConnectServerTest.scala:191)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.withClient(ReattachableExecuteSuite.scala:30)
[info]   at org.apache.spark.sql.connect.execution.ReattachableExecuteSuite.$anonfun$new$15(ReattachableExecuteSuite.scala:151)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
[info]   Cause: java.lang.StackOverflowError:
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
[info]   at org.apache.spark.sql.connect.client.WrappedCloseableIterator.hasNext(CloseableIterator.scala:36)
...
[info] - client releases responses directly after consuming them (236 milliseconds)
[info] - server releases responses automatically when client moves ahead (336 milliseconds)
[info] - big query (863 milliseconds)
[info] - big query and slow client (7 seconds, 14 milliseconds)
[info] - big query with frequent reattach (735 milliseconds)
[info] - big query with frequent reattach and slow client (7 seconds, 606 milliseconds)
[info] - long sleeping query (10 seconds, 156 milliseconds)
[info] Run completed in 34 seconds, 522 milliseconds.
[info] Total number of tests run: 13
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 12, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.connect.execution.ReattachableExecuteSuite
```

**After**

```
[info] ReattachableExecuteSuite:
[info] - reattach after initial RPC ends (2 seconds, 134 milliseconds)
[info] - raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error (26 milliseconds)
[info] - raw new RPC interrupts previous RPC with INVALID_CURSOR.DISCONNECTED error (19 milliseconds)
[info] - client INVALID_CURSOR.DISCONNECTED error is retried when rpc sender gets interrupted (328 milliseconds)
[info] - client INVALID_CURSOR.DISCONNECTED error is retried when other RPC preempts this one (562 milliseconds)
[info] - abandoned query gets INVALID_HANDLE.OPERATION_ABANDONED error (46 milliseconds)
[info] - client releases responses directly after consuming them (231 milliseconds)
[info] - server releases responses automatically when client moves ahead (359 milliseconds)
[info] - big query (978 milliseconds)
[info] - big query and slow client (7 seconds, 50 milliseconds)
[info] - big query with frequent reattach (703 milliseconds)
[info] - big query with frequent reattach and slow client (7 seconds, 626 milliseconds)
[info] - long sleeping query (10 seconds, 141 milliseconds)
[info] Run completed in 33 seconds, 844 milliseconds.
[info] Total number of tests run: 13
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#42981 from LuciferYang/CloseableIterator-apply.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
  • Loading branch information
LuciferYang committed Sep 19, 2023
1 parent d202c70 commit eec0907
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ private[sql] object CloseableIterator {
*/
def apply[T](iterator: Iterator[T]): CloseableIterator[T] = iterator match {
case closeable: CloseableIterator[T] => closeable
case _ =>
case iter =>
new WrappedCloseableIterator[T] {
override def innerIterator = iterator
override def innerIterator: Iterator[T] = iter
}
}
}

0 comments on commit eec0907

Please sign in to comment.