Skip to content

Commit

Permalink
[SPARK-48634][PYTHON][CONNECT][FOLLOW-UP] Do not make a request if th…
Browse files Browse the repository at this point in the history
…readpool is not initialized

### What changes were proposed in this pull request?

This PR proposes to not make a request if threadpool is not initialized to keep the same behaviour before apache#46993.

### Why are the changes needed?

To make Python exit slient.

### Does this PR introduce _any_ user-facing change?

Virtually no.

### How was this patch tested?

Manually tested, with long running Python job and exiting it.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47034 from HyukjinKwon/SPARK-48634-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Jun 20, 2024
1 parent 248fd4c commit 9eadb2c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ def close(self) -> None:
"""
Close the channel.
"""
ExecutePlanResponseReattachableIterator.shutdown_threadpool()
ExecutePlanResponseReattachableIterator.shutdown()
self._channel.close()
self._closed = True

Expand Down
23 changes: 8 additions & 15 deletions python/pyspark/sql/connect/client/reattach.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _release_thread_pool(cls) -> ThreadPool:
return cls._release_thread_pool_instance

@classmethod
def shutdown_threadpool(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
"""
When the channel is closed, this method will be called before, to make sure all
outstanding calls are closed.
Expand All @@ -85,23 +85,14 @@ def shutdown_threadpool(cls: Type["ExecutePlanResponseReattachableIterator"]) ->
cls._release_thread_pool.join() # type: ignore[attr-defined]
cls._release_thread_pool_instance = None

def shutdown(self: "ExecutePlanResponseReattachableIterator") -> None:
"""
When the channel is closed, this method will be called before, to make sure all
outstanding calls are closed, and mark this iterator is shutdown.
"""
with self._lock:
self.shutdown_threadpool()
self._is_shutdown = True

def __init__(
self,
request: pb2.ExecutePlanRequest,
stub: grpc_lib.SparkConnectServiceStub,
retrying: Callable[[], Retrying],
metadata: Iterable[Tuple[str, str]],
):
self._is_shutdown = False
self._release_thread_pool # Trigger initialization
self._request = request
self._retrying = retrying
if request.operation_id:
Expand Down Expand Up @@ -219,8 +210,9 @@ def target() -> None:
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")

if not self._is_shutdown:
self._release_thread_pool.apply_async(target)
with self._lock:
if self._release_thread_pool_instance is not None:
self._release_thread_pool.apply_async(target)

def _release_all(self) -> None:
"""
Expand All @@ -243,8 +235,9 @@ def target() -> None:
except Exception as e:
warnings.warn(f"ReleaseExecute failed with exception: {e}.")

if not self._is_shutdown:
self._release_thread_pool.apply_async(target)
with self._lock:
if self._release_thread_pool_instance is not None:
self._release_thread_pool.apply_async(target)
self._result_complete = True

def _call_iter(self, iter_fun: Callable) -> Any:
Expand Down

0 comments on commit 9eadb2c

Please sign in to comment.