From d6d6b0454a06a33eb3579e3e4b8080742a3fb32f Mon Sep 17 00:00:00 2001 From: Slava Min Date: Mon, 6 Jan 2025 12:06:19 -0800 Subject: [PATCH] =?UTF-8?q?Release=20the=20reference=20to=20the=20calling?= =?UTF-8?q?=20Spark=20session=20upon=20successful=20co=E2=80=A6=20(#4010)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …mpletion of NonFateSharingFuture #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description SparkSession object which initiated the Delta snapshot is kept alive for the lifetime of the snapshot. This change releases the reference to SparkSession once the future completes. ## How was this patch tested? ## Does this PR introduce _any_ user-facing changes? No --- .../spark/sql/delta/util/threads/DeltaThreadPool.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala index d5ec23fb336..62d196e3640 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/threads/DeltaThreadPool.scala @@ -107,7 +107,10 @@ class NonFateSharingFuture[T](pool: DeltaThreadPool)(f: SparkSession => T) // Prefer to get a prefetched result from the future, but never fail because of it. val futureResult = futureOpt.flatMap { case (ownerSession, future) => try { - Some(ThreadUtils.awaitResult(future, timeout)) + val result = Some(ThreadUtils.awaitResult(future, timeout)) + // no reason to keep the reference to the calling session anymore + futureOpt = Some(null, future) + result } catch { // NOTE: ThreadUtils.awaitResult wraps all non-fatal exceptions other than TimeoutException // with SparkException. Meanwhile, Java Future.get only throws four exceptions: