Skip to content

Commit

Permalink
adapt broker and integration tests to the new worker
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Jan 9, 2025
1 parent eb681c6 commit f655e1b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 25 deletions.
8 changes: 5 additions & 3 deletions src/broker/operandi_broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
__all__ = [
"cli",
"ServiceBroker",
"JobDownloadWorker",
"JobStatusWorker",
"Worker"
"JobSubmitWorker",
"ServiceBroker",
]

from .cli import cli
from .broker import ServiceBroker
from .job_download_worker import JobDownloadWorker
from .job_status_worker import JobStatusWorker
from .job_submit_worker import Worker
from .job_submit_worker import JobSubmitWorker
36 changes: 21 additions & 15 deletions src/broker/operandi_broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri, verify_and_parse_mq_uri)
from operandi_utils.constants import LOG_LEVEL_BROKER
from operandi_utils.rabbitmq.constants import (
RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)
from .job_submit_worker import Worker
RABBITMQ_QUEUE_HPC_DOWNLOADS, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)
from .job_download_worker import JobDownloadWorker
from .job_status_worker import JobStatusWorker
from .job_submit_worker import JobSubmitWorker


class ServiceBroker:
Expand Down Expand Up @@ -48,14 +49,15 @@ def run_broker(self):
# A list of queues for which a worker process should be created
queues = [RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS]
status_queue = RABBITMQ_QUEUE_JOB_STATUSES
hpc_download_queue = RABBITMQ_QUEUE_HPC_DOWNLOADS
try:
for queue_name in queues:
self.log.info(f"Creating a worker process to consume from queue: {queue_name}")
self.create_worker_process(
queue_name=queue_name, status_checker=False, tunnel_port_executor=22, tunnel_port_transfer=22)
self.create_worker_process(queue_name, "submit_worker")
self.log.info(f"Creating a status worker process to consume from queue: {status_queue}")
self.create_worker_process(
queue_name=status_queue, status_checker=True, tunnel_port_executor=22, tunnel_port_transfer=22)
self.create_worker_process(status_queue, "status_worker")
self.log.info(f"Creating a download worker process to consume from queue: {hpc_download_queue}")
self.create_worker_process(hpc_download_queue, "download_worker")
except Exception as error:
self.log.error(f"Error while creating worker processes: {error}")

Expand All @@ -81,15 +83,15 @@ def run_broker(self):

# Creates a separate worker process and append its pid if successful
def create_worker_process(
self, queue_name, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22, status_checker=False
self, queue_name, worker_type: str, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22
) -> None:
# If the entry for queue_name does not exist, create id
if queue_name not in self.queues_and_workers:
self.log.info(f"Initializing workers list for queue: {queue_name}")
# Initialize the worker pids list for the queue
self.queues_and_workers[queue_name] = []
child_pid = self.__create_child_process(
queue_name=queue_name, status_checker=status_checker, tunnel_port_executor=tunnel_port_executor,
queue_name=queue_name, worker_type=worker_type, tunnel_port_executor=tunnel_port_executor,
tunnel_port_transfer=tunnel_port_transfer)
# If creation of the child process was successful
if child_pid:
Expand All @@ -99,27 +101,31 @@ def create_worker_process(

# Forks a child process
def __create_child_process(
self, queue_name, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22, status_checker=False
self, queue_name, worker_type: str, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22
) -> int:
self.log.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
# TODO: Try to utilize Popen() instead of fork()
created_pid = fork()
except Exception as os_error:
self.log.error(f"Failed to create a child process, reason: {os_error}")
return 0

if created_pid != 0:
return created_pid
try:
# Clean unnecessary data
# self.queues_and_workers = None
if status_checker:
if worker_type == "status_worker":
child_worker = JobStatusWorker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch)
else:
child_worker = Worker(
elif worker_type == "download_worker":
child_worker = JobDownloadWorker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch
)
else: # worker_type == "submit_worker"
child_worker = JobSubmitWorker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch)
Expand Down
2 changes: 1 addition & 1 deletion src/broker/operandi_broker/job_download_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from operandi_utils.hpc import NHRExecutor, NHRTransfer
from operandi_utils.rabbitmq import get_connection_consumer


# TODO: Adapt the JobDownloadWorker to do the task of downloading instead of the job status worker
class JobDownloadWorker:
def __init__(self, db_url, rabbitmq_url, queue_name, tunnel_port_executor, tunnel_port_transfer, test_sbatch=False):
self.log = getLogger(f"operandi_broker.job_download_worker[{getpid()}].{queue_name}")
Expand Down
1 change: 1 addition & 0 deletions src/broker/operandi_broker/job_status_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from operandi_utils.rabbitmq import get_connection_consumer


# TODO: Refactor status worker to not download files. Adapt the JobDownloadWorker to do the task.
class JobStatusWorker:
def __init__(self, db_url, rabbitmq_url, queue_name, tunnel_port_executor, tunnel_port_transfer, test_sbatch=False):
self.log = getLogger(f"operandi_broker.job_status_worker[{getpid()}].{queue_name}")
Expand Down
2 changes: 1 addition & 1 deletion src/broker/operandi_broker/job_submit_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# Each worker class listens to a specific queue,
# consume messages, and process messages.
class Worker:
class JobSubmitWorker:
def __init__(self, db_url, rabbitmq_url, queue_name, tunnel_port_executor, tunnel_port_transfer, test_sbatch=False):
self.log = getLogger(f"operandi_broker.worker[{getpid()}].{queue_name}")
self.queue_name = queue_name
Expand Down
2 changes: 2 additions & 0 deletions src/utils/operandi_utils/rabbitmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"RABBITMQ_QUEUE_DEFAULT",
"RABBITMQ_QUEUE_JOB_STATUSES",
"RABBITMQ_QUEUE_HARVESTER",
"RABBITMQ_QUEUE_HPC_DOWNLOADS",
"RABBITMQ_QUEUE_USERS",
"RMQConnector"
]
Expand All @@ -17,6 +18,7 @@
RABBITMQ_QUEUE_DEFAULT,
RABBITMQ_QUEUE_JOB_STATUSES,
RABBITMQ_QUEUE_HARVESTER,
RABBITMQ_QUEUE_HPC_DOWNLOADS,
RABBITMQ_QUEUE_USERS
)
from .wrappers import get_connection_consumer, get_connection_publisher
10 changes: 5 additions & 5 deletions tests/integration_tests/test_full_cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from operandi_server.constants import (
DEFAULT_METS_BASENAME, DEFAULT_FILE_GRP, SERVER_WORKFLOW_JOBS_ROUTER, SERVER_WORKSPACES_ROUTER)
from operandi_utils.constants import StateJob
from operandi_utils.rabbitmq import RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_JOB_STATUSES
from operandi_utils.rabbitmq import RABBITMQ_QUEUE_HPC_DOWNLOADS, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_JOB_STATUSES
from operandi_utils.hpc.constants import HPC_NHR_JOB_TEST_PARTITION
from tests.tests_server.helpers_asserts import assert_response_status_code

Expand Down Expand Up @@ -48,11 +48,11 @@ def test_full_cycle(auth_harvester, operandi, service_broker, bytes_small_worksp
assert response.json()["message"] == "The home page of the OPERANDI Server"

# Create a background worker for the harvester queue
service_broker.create_worker_process(
queue_name=RABBITMQ_QUEUE_HARVESTER, status_checker=False, tunnel_port_executor=22, tunnel_port_transfer=22)
service_broker.create_worker_process(RABBITMQ_QUEUE_HARVESTER, "submit_worker")
# Create a background worker for the job statuses queue
service_broker.create_worker_process(
queue_name=RABBITMQ_QUEUE_JOB_STATUSES, status_checker=True, tunnel_port_executor=22, tunnel_port_transfer=22)
service_broker.create_worker_process(RABBITMQ_QUEUE_JOB_STATUSES, "status_worker")
# Create a background worker for the hpc download queue
service_broker.create_worker_process(RABBITMQ_QUEUE_HPC_DOWNLOADS, "download_worker")

# Post a workspace zip
response = operandi.post(url="/workspace", files={"workspace": bytes_small_workspace}, auth=auth_harvester)
Expand Down

0 comments on commit f655e1b

Please sign in to comment.