Skip to content

Commit

Permalink
refactor: split broker to broker and utils
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Jan 14, 2025
1 parent 1ca4ded commit ec080af
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 64 deletions.
70 changes: 6 additions & 64 deletions src/broker/operandi_broker/broker.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from logging import getLogger
from os import environ, fork
import psutil
import signal
from os import environ
from time import sleep

from operandi_utils import (
get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri, verify_and_parse_mq_uri)
from operandi_utils.constants import LOG_LEVEL_BROKER
from operandi_utils.rabbitmq.constants import (
RABBITMQ_QUEUE_HPC_DOWNLOADS, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)
from .job_worker_download import JobWorkerDownload
from .job_worker_status import JobWorkerStatus
from .job_worker_submit import JobWorkerSubmit

from .broker_utils import create_child_process, kill_workers


class ServiceBroker:
Expand Down Expand Up @@ -74,7 +71,7 @@ def run_broker(self):
except KeyboardInterrupt:
self.log.info(f"SIGINT signal received. Sending SIGINT to worker processes.")
# Sends SIGINT to workers
self.kill_workers()
kill_workers(self.log, self.queues_and_workers)
self.log.info(f"Closing gracefully in 3 seconds!")
exit(0)
except Exception as error:
Expand All @@ -88,65 +85,10 @@ def create_worker_process(self, queue_name, worker_type: str) -> None:
self.log.info(f"Initializing workers list for queue: {queue_name}")
# Initialize the worker pids list for the queue
self.queues_and_workers[queue_name] = []
child_pid = self.__create_child_process(queue_name=queue_name, worker_type=worker_type)
child_pid = create_child_process(
self.log, self.db_url, self.rabbitmq_url, queue_name, worker_type, self.test_sbatch)
# If creation of the child process was successful
if child_pid:
self.log.info(f"Assigning a new worker process with pid: {child_pid}, to queue: {queue_name}")
# append the pid to the workers list of the queue_name
(self.queues_and_workers[queue_name]).append(child_pid)

# Forks a child process
def __create_child_process(self, queue_name, worker_type: str) -> int:
self.log.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
created_pid = fork()
except Exception as os_error:
self.log.error(f"Failed to create a child process, reason: {os_error}")
return 0

if created_pid != 0:
return created_pid
try:
if worker_type == "status_worker":
child_worker = JobWorkerStatus(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=True)
elif worker_type == "download_worker":
child_worker = JobWorkerDownload(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
else: # worker_type == "submit_worker"
child_worker = JobWorkerSubmit(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
test_sbatch=self.test_sbatch)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
exit(0)
except Exception as e:
self.log.error(f"Worker process failed for queue: {queue_name}, reason: {e}")
exit(-1)

def _send_signal_to_worker(self, worker_pid: int, signal_type: signal):
try:
process = psutil.Process(pid=worker_pid)
process.send_signal(signal_type)
except psutil.ZombieProcess as error:
self.log.info(f"Worker process has become a zombie: {worker_pid}, {error}")
except psutil.NoSuchProcess as error:
self.log.error(f"No such worker process with pid: {worker_pid}, {error}")
except psutil.AccessDenied as error:
self.log.error(f"Access denied to the worker process with pid: {worker_pid}, {error}")

def kill_workers(self):
interrupted_pids = []
self.log.info(f"Starting to send SIGINT to all workers")
# Send SIGINT to all workers
for queue_name in self.queues_and_workers:
self.log.info(f"Sending SIGINT to workers of queue: {queue_name}")
for worker_pid in self.queues_and_workers[queue_name]:
self._send_signal_to_worker(worker_pid=worker_pid, signal_type=signal.SIGINT)
interrupted_pids.append(worker_pid)
sleep(3)
self.log.info(f"Sending SIGKILL (if needed) to previously interrupted workers")
# Check whether workers exited properly
for pid in interrupted_pids:
self._send_signal_to_worker(worker_pid=pid, signal_type=signal.SIGKILL)
67 changes: 67 additions & 0 deletions src/broker/operandi_broker/broker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from logging import Logger
from os import fork
import psutil
import signal
from time import sleep
from typing import Dict

from .job_worker_download import JobWorkerDownload
from .job_worker_status import JobWorkerStatus
from .job_worker_submit import JobWorkerSubmit


# Forks a child process
def create_child_process(
logger: Logger, db_url: str, rabbitmq_url: str, queue_name: str, worker_type: str, test_batch: bool
) -> int:
logger.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
created_pid = fork()
except Exception as os_error:
logger.error(f"Failed to create a child process, reason: {os_error}")
return 0

if created_pid != 0:
return created_pid
try:
if worker_type == "status_worker":
child_worker = JobWorkerStatus(db_url, rabbitmq_url, queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=True)
elif worker_type == "download_worker":
child_worker = JobWorkerDownload(db_url, rabbitmq_url, queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
else: # worker_type == "submit_worker"
child_worker = JobWorkerSubmit(db_url, rabbitmq_url, queue_name, test_batch)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
exit(0)
except Exception as e:
logger.error(f"Worker process failed for queue: {queue_name}, reason: {e}")
exit(-1)


def send_signal_to_worker(logger: Logger, worker_pid: int, signal_type: signal):
try:
process = psutil.Process(pid=worker_pid)
process.send_signal(signal_type)
except psutil.ZombieProcess as error:
logger.info(f"Worker process has become a zombie: {worker_pid}, {error}")
except psutil.NoSuchProcess as error:
logger.error(f"No such worker process with pid: {worker_pid}, {error}")
except psutil.AccessDenied as error:
logger.error(f"Access denied to the worker process with pid: {worker_pid}, {error}")


def kill_workers(logger: Logger, queues_and_workers: Dict):
interrupted_pids = []
logger.info(f"Starting to send SIGINT to all workers")
# Send SIGINT to all workers
for queue_name in queues_and_workers:
logger.info(f"Sending SIGINT to workers of queue: {queue_name}")
for worker_pid in queues_and_workers[queue_name]:
send_signal_to_worker(logger, worker_pid=worker_pid, signal_type=signal.SIGINT)
interrupted_pids.append(worker_pid)
sleep(3)
logger.info(f"Sending SIGKILL (if needed) to previously interrupted workers")
# Check whether workers exited properly
for pid in interrupted_pids:
send_signal_to_worker(logger, worker_pid=pid, signal_type=signal.SIGKILL)

0 comments on commit ec080af

Please sign in to comment.