Skip to content

Commit

Permalink
We can now terminate properly
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Michael Smith <[email protected]>
  • Loading branch information
tlrmchlsmth committed Nov 22, 2024
1 parent d4b55ae commit feeed73
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 21 deletions.
3 changes: 2 additions & 1 deletion vllm/distributed/device_communicators/shm_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
# measured to be around 3e-7 seconds. However on earlier versions of Python
# os.sched_yield() does not release the GIL, so we fall back to time.sleep(0)
USE_SCHED_YIELD = ((sys.version_info[:3] >= (3, 11, 1))
or (sys.version_info[:2] == (3, 10) and sys.version_info[2] >= 8))
or (sys.version_info[:2] == (3, 10)
and sys.version_info[2] >= 8))


class ShmRingBuffer:
Expand Down
19 changes: 11 additions & 8 deletions vllm/v1/executor/multiproc_gpu_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import atexit
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
Expand All @@ -20,8 +21,8 @@
class MultiprocessingGPUExecutor:

def __init__(self, vllm_config: VllmConfig) -> None:
# Store early so we can count on using it at shutdown
self._TERMINATE_VALUE = ExecutorMsgType.TERMINATE.value
# Register self.shutdown so we can make sure to call it on exit
atexit.register(self.shutdown)

self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
Expand Down Expand Up @@ -143,14 +144,16 @@ def execute_model(

def shutdown(self):
"""Properly shut down the executor and its workers"""
termination_msg = ExecutorMsg(self._TERMINATE_VALUE, None)
self.scheduler_output_mq.enqueue(termination_msg)

# Shutdown the worker processes if needed.
self.run_on_workers('terminate')
if not (self._shutdown_called and hasattr(self, 'scheduler_output_mq')
and self.scheduler_output_mq is not None):
termination_msg = ExecutorMsg(ExecutorMsgType.TERMINATE.value,
None)
self.scheduler_output_mq.enqueue(termination_msg)
self.scheduler_output_mq = None

def __del__(self):
self.shutdown()
if hasattr(self, 'shutdown'):
self.shutdown()

def check_health(self) -> None:
# GPUExecutor will always be healthy as long as
Expand Down
12 changes: 0 additions & 12 deletions vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import gc
import multiprocessing
import os
import time
from dataclasses import dataclass
from multiprocessing.process import BaseProcess
from typing import TYPE_CHECKING, Optional, Tuple
Expand Down Expand Up @@ -246,17 +245,6 @@ def start_busy_loop(self) -> None:
socket.send_multipart(
(WorkerInitRequestType.BEGIN_MODEL_EXECUTION.value, ))

def terminate(self) -> None:
self.proc.terminate()
start_time = time.time()

while time.time() - start_time < 5:
if not self.proc.is_alive():
return # Process terminated successfully
time.sleep(0.1) # Short sleep to avoid CPU spinning

self.proc.kill()


class WorkerProc:
"""Wrapper that runs one Worker in a separate process."""
Expand Down

0 comments on commit feeed73

Please sign in to comment.