Skip to content

Commit

Permalink
celery beat to worker docker command for task result cleaup
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Mar 16, 2022
1 parent 637adca commit 7b938fc
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ env
package-lock.json
node_modules

## Celery
celeryconfig*
celery-config*
celerybeat-schedule.*

## Python / Extensions etc.
*~
*.mo
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ node_modules
## Docker
#Dockerfile

## Celery
celeryconfig*
celery-config*
celerybeat-schedule.*

## Python / Extensions etc.
*~
*.mo
Expand Down
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Changes:
- Increase minor version of all ``builtin`` processes that will now be executable in wither (a)synchronous modes.
- Add ``weaver.exec_sync_max_wait`` and ``weaver.quote_sync_max_wait`` settings allowing custom definition for the
maximum duration that can be specified to wait for a `synchronous` response from task workers.
- Add ``-B`` (``celery beat``) option to Docker command of ``weaver-worker`` to run scheduled task in parallel
to ``celery worker`` in order to periodically cleanup task results introduced by *synchronous* execution.
- Improve conformance for returned status codes and error messages when requesting results for an unfinished,
failed, or dismissed ``Job``.
- Adjust conformance item references to correspond with `OGC API - Processes: Part 2` renamed from `Transactions` to
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile-worker
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
&& apt update \
# NOTE:
# Only install CLI package, 'docker-ce' and 'containerd.io' not required as they should be provided by host.
# Docker sibliing execution is expected. See 'docker/docker-compose.yml.example' for details.
# Docker sibling execution is expected. See 'docker/docker-compose.yml.example' for details.
&& apt install --no-install-recommends docker-ce-cli \
&& rm -rf /var/lib/apt/lists/*

# run app
CMD celery worker -E -A pyramid_celery.celery_app --ini "${APP_CONFIG_DIR}/weaver.ini"
CMD celery worker -B -E -A pyramid_celery.celery_app --ini "${APP_CONFIG_DIR}/weaver.ini"
5 changes: 3 additions & 2 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from weaver.datatype import Job
from weaver.processes.convert import OWS_Input_Type, ProcessOWS
from weaver.status import StatusType
from weaver.typedefs import HeadersType, HeaderCookiesType, JSON, SettingsType
from weaver.typedefs import CeleryResult, HeadersType, HeaderCookiesType, JSON, SettingsType
from weaver.visibility import AnyVisibility


Expand Down Expand Up @@ -587,7 +587,8 @@ def submit_job_handler(payload, # type: JSON
resp_headers = {"Location": location_url}
resp_headers.update(applied)

result = execute_process.delay(job_id=job.id, wps_url=clean_ows_url(service_url), headers=headers)
wps_url = clean_ows_url(service_url)
result = execute_process.delay(job_id=job.id, wps_url=wps_url, headers=headers) # type: CeleryResult
LOGGER.debug("Celery pending task [%s] for job [%s].", result.id, job.id)
if not is_execute_async:
LOGGER.debug("Celery task requested as sync if it completes before (wait=%ss)", wait)
Expand Down
3 changes: 3 additions & 0 deletions weaver/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
FileSystemPathType = str

from celery.app import Celery
from celery.result import AsyncResult, EagerResult, GroupResult, ResultSet
from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Process as ProcessOWS, WPSExecution
from pyramid.httpexceptions import HTTPSuccessful, HTTPRedirection
from pyramid.registry import Registry
Expand Down Expand Up @@ -298,3 +299,5 @@ def __call__(self, message: str, progress: Number, status: AnyStatusType, *args:
"inputs": JobInputs,
"outputs": JobOutputs,
})

CeleryResult = Union[AsyncResult, EagerResult, GroupResult, ResultSet]

0 comments on commit 7b938fc

Please sign in to comment.