Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Improvement of job status reporting #7

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## Version X.X.X (XXXX-XX-XX)

- Implement support for passing additional status values (statusextras) to Moodle
- Periodically report progress of running jobs back to Moodle
- Creation of new job status values:
- `WAITING_FOR_BACKUP`: All attempt reports are generated and the archive worker service
is waiting for the Moodle backup to be ready.
- `FINALIZING`: The archive worker service is finalizing the archive creation process (checksums, compression, ...).


## Version 1.5.0 (2024-07-18)

- Optionally scale down large images within quiz reports to preserve space and keep PDF files compact
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ using the following environment variables:
- `QUIZ_ARCHIVER_LOG_LEVEL`: Logging level. One of `'CRITICAL'`, `'FATAL'`, `'ERROR'`, `'WARN'`, `'WARNING'`, `'INFO'`, `'DEBUG'` (default=`'INFO'`)
- `QUIZ_ARCHIVER_QUEUE_SIZE`: Maximum number of jobs to enqueue (default=`8`)
- `QUIZ_ARCHIVER_HISTORY_SIZE`: Maximum number of jobs to remember in job history (default=`128`)
- `QUIZ_ARCHIVER_STATUS_REPORTING_INTERVAL_SEC`: Number of seconds to wait between job progress updates (default=`15`)
- `QUIZ_ARCHIVER_REQUEST_TIMEOUT_SEC`: Maximum number of seconds a single job is allowed to run before it is terminated (default=`3600`)
- `QUIZ_ARCHIVER_BACKUP_STATUS_RETRY_SEC`: Number of seconds to wait between backup status queries (default=`30`)
- `QUIZ_ARCHIVER_DOWNLOAD_MAX_FILESIZE_BYTES`: Maximum number of bytes a generic Moodle file is allowed to have for downloading (default=`(1024 * 10e6)`)
Expand Down
2 changes: 2 additions & 0 deletions archiveworker/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class JobStatus(StrEnum):
UNINITIALIZED = 'UNINITIALIZED'
AWAITING_PROCESSING = 'AWAITING_PROCESSING'
RUNNING = 'RUNNING'
WAITING_FOR_BACKUP = 'WAITING_FOR_BACKUP'
FINALIZING = 'FINALIZING'
FINISHED = 'FINISHED'
FAILED = 'FAILED'
TIMEOUT = 'TIMEOUT'
Expand Down
12 changes: 10 additions & 2 deletions archiveworker/moodle_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,27 @@ def check_connection(self) -> bool:
self.logger.warning(f'Moodle API connection check failed with Moodle error: {data["errorcode"]}')
return False

def update_job_status(self, jobid: UUID, status: JobStatus) -> bool:
def update_job_status(self, jobid: UUID, status: JobStatus, statusextras: Dict = None) -> bool:
"""
Update the status of a job via the Moodle API

:param jobid: UUID of the job to update
:param status: New status to set
:param statusextras: Additional status information to include
:return: True if the status was updated successfully, False otherwise
"""
try:
# Prepare statusextras
conditional_params = {}
if statusextras:
conditional_params = {f'statusextras': json.dumps(statusextras)}

# Call wsfunction to update job status
r = requests.get(url=self.ws_rest_url, params=self._generate_wsfunc_request_params(
wsfunction=Config.MOODLE_WSFUNCTION_UPDATE_JOB_STATUS,
jobid=str(jobid),
status=str(status)
status=str(status),
**conditional_params
))
data = r.json()

Expand Down
2 changes: 1 addition & 1 deletion archiveworker/moodle_quiz_archive_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def handle_archive_request():
job = QuizArchiveJob(uuid.uuid1(), job_request)
job_queue.put_nowait(job) # Actual queue capacity limit is enforced here!
job_history.append(job)
job.set_status(JobStatus.AWAITING_PROCESSING)
job.set_status(JobStatus.AWAITING_PROCESSING, notify_moodle=False)
app.logger.info(f"Enqueued job {job.get_id()} from {request.remote_addr}")
except TypeError as e:
app.logger.debug(f'JSON is technically incomplete or missing a required parameter. TypeError: {str(e)}')
Expand Down
41 changes: 37 additions & 4 deletions archiveworker/quiz_archive_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import threading
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List
from time import time
from typing import List, Dict
from uuid import UUID

import requests
Expand All @@ -46,6 +47,8 @@ class QuizArchiveJob:
def __init__(self, jobid: UUID, job_request: JobArchiveRequest):
self.id = jobid
self.status = JobStatus.UNINITIALIZED
self.statusextras = None
self.last_moodle_status_update = None
self.request = job_request
self.workdir = None
self.archived_attempts = {}
Expand Down Expand Up @@ -91,19 +94,30 @@ def get_status(self) -> JobStatus:
"""
return self.status

def set_status(self, status: JobStatus, notify_moodle: bool = False) -> None:
def get_statusextras(self) -> Dict:
"""
Returns additional status information

:return: Additional status information
"""
return self.statusextras

def set_status(self, status: JobStatus, statusextras: Dict = None, notify_moodle: bool = False) -> None:
"""
Updates the status of this job. If notify_moodle is True, the status update
is passed to the Moodle API as well.

:param status: New job status
:param statusextras: Additional status information
:param notify_moodle: Call job status update function via Moodle API if True
:return: None
"""
self.status = status
self.statusextras = statusextras

if notify_moodle:
self.moodle_api.update_job_status(jobid=self.id, status=self.status)
self.moodle_api.update_job_status(jobid=self.id, status=self.status, statusextras=self.statusextras)
self.last_moodle_status_update = time()

def execute(self) -> None:
"""
Expand All @@ -112,7 +126,7 @@ def execute(self) -> None:
:return: None
"""
self.logger.info(f"Processing job {self.id}")
self.set_status(JobStatus.RUNNING, notify_moodle=True)
self.set_status(JobStatus.RUNNING, statusextras={'progress': 0}, notify_moodle=True)

try:
with TemporaryDirectory() as tempdir:
Expand All @@ -133,6 +147,9 @@ def execute(self) -> None:
if self.request.tasks['archive_moodle_backups']:
asyncio.run(self._process_moodle_backups())

# Transition to state: FINALIZING
self.set_status(JobStatus.FINALIZING, notify_moodle=True)

# Hash every file
self.logger.info("Calculating file hashes ...")
archive_files = glob.glob(f'{self.workdir}/**/*', recursive=True)
Expand Down Expand Up @@ -204,6 +221,7 @@ async def _process_quiz_attempts(self, attemptids: List[int], paper_format: str)
if threading.current_thread().stop_requested():
raise InterruptedError('Thread stop requested')
else:
# Process attempt
await self._render_quiz_attempt(context, attemptid, paper_format)
if self.request.tasks['archive_quiz_attempts']['image_optimize']:
await self._compress_pdf(
Expand All @@ -214,6 +232,16 @@ async def _process_quiz_attempts(self, attemptids: List[int], paper_format: str)
image_quality=self.request.tasks['archive_quiz_attempts']['image_optimize']['quality']
)

# Report status
if time() >= self.last_moodle_status_update + Config.STATUS_REPORTING_INTERVAL_SEC:
self.set_status(
JobStatus.RUNNING,
statusextras={'progress': round((len(self.archived_attempts) / len(attemptids)) * 100)},
notify_moodle=True
)
else:
self.logger.debug("Skipping status update because reporting interval has not been reached yet")

await browser.close()
self.logger.debug("Destroyed playwright Browser and BrowserContext")

Expand Down Expand Up @@ -543,7 +571,12 @@ async def _process_moodle_backup(self, backupid: str, filename: str, download_ur
if status == BackupStatus.SUCCESS:
break

# Notify user about waiting
self.logger.info(f'Backup {backupid} not finished yet. Waiting {Config.BACKUP_STATUS_RETRY_SEC} seconds before retrying ...')
if self.get_status() != JobStatus.WAITING_FOR_BACKUP:
self.set_status(JobStatus.WAITING_FOR_BACKUP, notify_moodle=True)

# Wait for next backup status check
await asyncio.sleep(Config.BACKUP_STATUS_RETRY_SEC)

# Check backup filesize
Expand Down
3 changes: 3 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class Config:
HISTORY_SIZE = int(os.getenv('QUIZ_ARCHIVER_HISTORY_SIZE', default=128))
"""Maximum number of jobs to keep in the history before forgetting about them."""

STATUS_REPORTING_INTERVAL_SEC = int(os.getenv('QUIZ_ARCHIVER_STATUS_REPORTING_INTERVAL_SEC', default=15))
"""Number of seconds to wait between job progress updates"""

REQUEST_TIMEOUT_SEC = int(os.getenv('QUIZ_ARCHIVER_REQUEST_TIMEOUT_SEC', default=(60 * 60)))
"""Number of seconds before execution of a single request is aborted."""

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def get_uploaded_files(self) -> Dict[int, Dict[str, Union[str, Path]]]:
def check_connection(self) -> bool:
return True

def update_job_status(self, jobid: UUID, status: JobStatus) -> bool:
def update_job_status(self, jobid: UUID, status: JobStatus, statusextras: Dict) -> bool:
return True

def get_backup_status(self, jobid: UUID, backupid: str) -> BackupStatus:
Expand Down