diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c2009e..5bd56f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## Version X.X.X (XXXX-XX-XX) + +- Implement support for passing additional status values (statusextras) to Moodle +- Periodically report progress of running jobs back to Moodle +- Creation of new job status values: + - `WAITING_FOR_BACKUP`: All attempt reports are generated and the archive worker service + is waiting for the Moodle backup to be ready. + - `FINALIZING`: The archive worker service is finalizing the archive creation process (checksums, compression, ...). + + ## Version 1.5.0 (2024-07-18) - Optionally scale down large images within quiz reports to preserve space and keep PDF files compact diff --git a/README.md b/README.md index 25552b7..3efcece 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,7 @@ using the following environment variables: - `QUIZ_ARCHIVER_LOG_LEVEL`: Logging level. One of `'CRITICAL'`, `'FATAL'`, `'ERROR'`, `'WARN'`, `'WARNING'`, `'INFO'`, `'DEBUG'` (default=`'INFO'`) - `QUIZ_ARCHIVER_QUEUE_SIZE`: Maximum number of jobs to enqueue (default=`8`) - `QUIZ_ARCHIVER_HISTORY_SIZE`: Maximum number of jobs to remember in job history (default=`128`) +- `QUIZ_ARCHIVER_STATUS_REPORTING_INTERVAL_SEC`: Number of seconds to wait between job progress updates (default=`15`) - `QUIZ_ARCHIVER_REQUEST_TIMEOUT_SEC`: Maximum number of seconds a single job is allowed to run before it is terminated (default=`3600`) - `QUIZ_ARCHIVER_BACKUP_STATUS_RETRY_SEC`: Number of seconds to wait between backup status queries (default=`30`) - `QUIZ_ARCHIVER_DOWNLOAD_MAX_FILESIZE_BYTES`: Maximum number of bytes a generic Moodle file is allowed to have for downloading (default=`(1024 * 10e6)`) diff --git a/archiveworker/custom_types.py b/archiveworker/custom_types.py index 765711c..2ed8a43 100644 --- a/archiveworker/custom_types.py +++ b/archiveworker/custom_types.py @@ -44,6 +44,8 @@ class JobStatus(StrEnum): UNINITIALIZED = 'UNINITIALIZED' AWAITING_PROCESSING = 'AWAITING_PROCESSING' RUNNING = 'RUNNING' + WAITING_FOR_BACKUP = 'WAITING_FOR_BACKUP' + FINALIZING = 'FINALIZING' FINISHED = 'FINISHED' FAILED = 'FAILED' TIMEOUT = 'TIMEOUT' diff --git a/archiveworker/moodle_api.py b/archiveworker/moodle_api.py index 91eebb3..6d3aea3 100644 --- a/archiveworker/moodle_api.py +++ b/archiveworker/moodle_api.py @@ -125,19 +125,27 @@ def check_connection(self) -> bool: self.logger.warning(f'Moodle API connection check failed with Moodle error: {data["errorcode"]}') return False - def update_job_status(self, jobid: UUID, status: JobStatus) -> bool: + def update_job_status(self, jobid: UUID, status: JobStatus, statusextras: Dict = None) -> bool: """ Update the status of a job via the Moodle API :param jobid: UUID of the job to update :param status: New status to set + :param statusextras: Additional status information to include :return: True if the status was updated successfully, False otherwise """ try: + # Prepare statusextras + conditional_params = {} + if statusextras: + conditional_params = {f'statusextras': json.dumps(statusextras)} + + # Call wsfunction to update job status r = requests.get(url=self.ws_rest_url, params=self._generate_wsfunc_request_params( wsfunction=Config.MOODLE_WSFUNCTION_UPDATE_JOB_STATUS, jobid=str(jobid), - status=str(status) + status=str(status), + **conditional_params )) data = r.json() diff --git a/archiveworker/moodle_quiz_archive_worker.py b/archiveworker/moodle_quiz_archive_worker.py index 46f2d3d..735f1a6 100755 --- a/archiveworker/moodle_quiz_archive_worker.py +++ b/archiveworker/moodle_quiz_archive_worker.py @@ -144,7 +144,7 @@ def handle_archive_request(): job = QuizArchiveJob(uuid.uuid1(), job_request) job_queue.put_nowait(job) # Actual queue capacity limit is enforced here! job_history.append(job) - job.set_status(JobStatus.AWAITING_PROCESSING) + job.set_status(JobStatus.AWAITING_PROCESSING, notify_moodle=False) app.logger.info(f"Enqueued job {job.get_id()} from {request.remote_addr}") except TypeError as e: app.logger.debug(f'JSON is technically incomplete or missing a required parameter. TypeError: {str(e)}') diff --git a/archiveworker/quiz_archive_job.py b/archiveworker/quiz_archive_job.py index 78bf2da..49bb7dc 100644 --- a/archiveworker/quiz_archive_job.py +++ b/archiveworker/quiz_archive_job.py @@ -25,7 +25,8 @@ import threading from pathlib import Path from tempfile import TemporaryDirectory -from typing import List +from time import time +from typing import List, Dict from uuid import UUID import requests @@ -46,6 +47,8 @@ class QuizArchiveJob: def __init__(self, jobid: UUID, job_request: JobArchiveRequest): self.id = jobid self.status = JobStatus.UNINITIALIZED + self.statusextras = None + self.last_moodle_status_update = None self.request = job_request self.workdir = None self.archived_attempts = {} @@ -91,19 +94,30 @@ def get_status(self) -> JobStatus: """ return self.status - def set_status(self, status: JobStatus, notify_moodle: bool = False) -> None: + def get_statusextras(self) -> Dict: + """ + Returns additional status information + + :return: Additional status information + """ + return self.statusextras + + def set_status(self, status: JobStatus, statusextras: Dict = None, notify_moodle: bool = False) -> None: """ Updates the status of this job. If notify_moodle is True, the status update is passed to the Moodle API as well. :param status: New job status + :param statusextras: Additional status information :param notify_moodle: Call job status update function via Moodle API if True :return: None """ self.status = status + self.statusextras = statusextras if notify_moodle: - self.moodle_api.update_job_status(jobid=self.id, status=self.status) + self.moodle_api.update_job_status(jobid=self.id, status=self.status, statusextras=self.statusextras) + self.last_moodle_status_update = time() def execute(self) -> None: """ @@ -112,7 +126,7 @@ def execute(self) -> None: :return: None """ self.logger.info(f"Processing job {self.id}") - self.set_status(JobStatus.RUNNING, notify_moodle=True) + self.set_status(JobStatus.RUNNING, statusextras={'progress': 0}, notify_moodle=True) try: with TemporaryDirectory() as tempdir: @@ -133,6 +147,9 @@ def execute(self) -> None: if self.request.tasks['archive_moodle_backups']: asyncio.run(self._process_moodle_backups()) + # Transition to state: FINALIZING + self.set_status(JobStatus.FINALIZING, notify_moodle=True) + # Hash every file self.logger.info("Calculating file hashes ...") archive_files = glob.glob(f'{self.workdir}/**/*', recursive=True) @@ -204,6 +221,7 @@ async def _process_quiz_attempts(self, attemptids: List[int], paper_format: str) if threading.current_thread().stop_requested(): raise InterruptedError('Thread stop requested') else: + # Process attempt await self._render_quiz_attempt(context, attemptid, paper_format) if self.request.tasks['archive_quiz_attempts']['image_optimize']: await self._compress_pdf( @@ -214,6 +232,16 @@ async def _process_quiz_attempts(self, attemptids: List[int], paper_format: str) image_quality=self.request.tasks['archive_quiz_attempts']['image_optimize']['quality'] ) + # Report status + if time() >= self.last_moodle_status_update + Config.STATUS_REPORTING_INTERVAL_SEC: + self.set_status( + JobStatus.RUNNING, + statusextras={'progress': round((len(self.archived_attempts) / len(attemptids)) * 100)}, + notify_moodle=True + ) + else: + self.logger.debug("Skipping status update because reporting interval has not been reached yet") + await browser.close() self.logger.debug("Destroyed playwright Browser and BrowserContext") @@ -543,7 +571,12 @@ async def _process_moodle_backup(self, backupid: str, filename: str, download_ur if status == BackupStatus.SUCCESS: break + # Notify user about waiting self.logger.info(f'Backup {backupid} not finished yet. Waiting {Config.BACKUP_STATUS_RETRY_SEC} seconds before retrying ...') + if self.get_status() != JobStatus.WAITING_FOR_BACKUP: + self.set_status(JobStatus.WAITING_FOR_BACKUP, notify_moodle=True) + + # Wait for next backup status check await asyncio.sleep(Config.BACKUP_STATUS_RETRY_SEC) # Check backup filesize diff --git a/config.py b/config.py index 4479172..0ab1cf6 100644 --- a/config.py +++ b/config.py @@ -44,6 +44,9 @@ class Config: HISTORY_SIZE = int(os.getenv('QUIZ_ARCHIVER_HISTORY_SIZE', default=128)) """Maximum number of jobs to keep in the history before forgetting about them.""" + STATUS_REPORTING_INTERVAL_SEC = int(os.getenv('QUIZ_ARCHIVER_STATUS_REPORTING_INTERVAL_SEC', default=15)) + """Number of seconds to wait between job progress updates""" + REQUEST_TIMEOUT_SEC = int(os.getenv('QUIZ_ARCHIVER_REQUEST_TIMEOUT_SEC', default=(60 * 60))) """Number of seconds before execution of a single request is aborted.""" diff --git a/tests/conftest.py b/tests/conftest.py index d01d150..eeb4ff8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -169,7 +169,7 @@ def get_uploaded_files(self) -> Dict[int, Dict[str, Union[str, Path]]]: def check_connection(self) -> bool: return True - def update_job_status(self, jobid: UUID, status: JobStatus) -> bool: + def update_job_status(self, jobid: UUID, status: JobStatus, statusextras: Dict) -> bool: return True def get_backup_status(self, jobid: UUID, backupid: str) -> BackupStatus: