Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add keda autoscaling support #10

Merged
merged 13 commits into from
Sep 12, 2024
92 changes: 74 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,49 @@ tutor plugins enable celery

### Celery queues

By default, tutor-contrib-celery enables the following queues with independent deployments
for each:
By default, in a standard OpenedX installation with Tutor in Kubernetes, all the LMS/CMS async tasks are executed
by a single celery deployment. This plugin allows to distribute async workload by configuring additional deployments
to execute celery tasks sent to a specific queues. This can help to:

- Achieve a better performance when having high volume of async tasks to process
- Configure different scaling parameters according to the nature of the tasks processed by a queue (I/O bound tasks,
CPU tasks, etc.)

To achieve this, the `CELERY_WORKERS_CONFIG` filter is implemented to add extra queues whose tasks require to be
processed by a separated deployment.

## Recommended multiqueue configuration

From checking the LMS and CMS codebase, the queues for every service are described below:

- **CMS**: default, high, low (taken from CMS settings [here](https://github.com/openedx/edx-platform/blob/open-release/redwood.master/cms/envs/common.py#L1578-L1582))
- **LMS**: default, high, high_mem (taken from LMS settings [here](https://github.com/openedx/edx-platform/blob/open-release/redwood.master/lms/envs/common.py#L2913-L2917))

> [!NOTE]
> We recommend using [tutor-contrib-pod-autoscaling](https://github.com/eduNEXT/tutor-contrib-pod-autoscaling)
> to setup requested resources and limits.
By default Tutor implements a single deployment to process tasks on all queues in LMS/CMS. The `CELERY_WORKERS_CONFIG` filter
can be used to add the extra queues from LMS/CMS configuration.

In case you are using different celery queues than the defaults from Open edX, you can
extend the list by setting `CELERY_WORKER_VARIANTS` on your `config.yml`. The format is the following:
```python

```yaml
CELERY_WORKER_VARIANTS:
lms:
- high
- high_mem
- lms_custom_queue
cms:
- high
- low
- cms_custom_queue
from tutorcelery.hooks import CELERY_WORKERS_CONFIG

@CELERY_WORKERS_CONFIG.add()
def _add_celery_workers_config(workers_config):
# Adding LMS extra queues
workers_config["lms"]["high"] = {} # Make sure to match the key with the queue name: edx.lms.core.high
workers_config["lms"]["high_mem"] = {}

# Adding CMS extra queues
workers_config["cms"]["high"] = {}
workers_config["cms"]["low"] = {}
return workers_config
```
With this configuration, 4 new deployments will be created (one for every new queue) to process the tasks
separately according to the queue they are sent to. Additionally, the default Tutor LMS/CMS celery deployments
are patched to ONLY process the tasks sent to the "default" queue.

This is the recommended configuration for a multiqueue approach with LMS and CMS given the queues every
service proposes in its settings files by default. However, the usage of the `CELERY_WORKERS_CONFIG` filter
can be adapted for different configuration scenarios.

This plugin also provides a setting to directly route LMS/CMS tasks to an specific queue. It can extends/overrides
the default `EXPLICIT_QUEUES` setting:
Expand All @@ -60,6 +79,42 @@ CELERY_CMS_EXPLICIT_QUEUES:
queue: edx.cms.core.high
```

### Autoscaling

As an alternative to the CPU/memory based autoscaling offered by the plugin [tutor-contrib-pod-autoscaling](https://github.com/eduNEXT/tutor-contrib-pod-autoscaling),
this plugins supports Celery workers autoscaling based on the size of the celery queue of a given worker. We are using
Keda autoscaling for this purposes, check the [Keda documentation](https://keda.sh/docs) to find out more.

To enable autoscaling you need to enable the `enable_keda` key for every queue variant. The defaults parameters are the following:

```python
{
"min_replicas": 0,
"max_replicas": 30,
"list_length": 40,
"enable_keda": False,
}
```

> [!NOTE]
> You can use the filter `CELERY_WORKERS_CONFIG` as shown above to modify the scaling parameters for every queue.

If you are using [tutor-contrib-pod-autoscaling](https://github.com/eduNEXT/tutor-contrib-pod-autoscaling) and want to setup Keda autoscaling, make sure to disable HPA for the `lms-worker` and the `cms-worker` as **using both autoscalers at the same time is not recommended**.

```python
from tutorpod_autoscaling.hooks import AUTOSCALING_CONFIG

@AUTOSCALING_CONFIG.add()
def _add_my_autoscaling(autoscaling_config):
autoscaling_config["lms-worker"].update({
"enable_hpa": False,
})
autoscaling_config["cms-worker"].update({
"enable_hpa": False,
})
return autoscaling_config
```

### Enable flower

For troubleshooting purposes, you can enable a flower deployment to monitor in realtime the Celery queues
Expand Down Expand Up @@ -91,6 +146,7 @@ CELERY_FLOWER_SERVICE_MONITOR: true
```

License
*******

---

This software is licensed under the terms of the AGPLv3.
27 changes: 27 additions & 0 deletions tutorcelery/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
These hooks are stored in a separate module. If they were included in plugin.py, then
the pod-autoscaling hooks would be created in the context of some other plugin that imports
them.
"""

from __future__ import annotations
import sys

if sys.version_info < (3, 11):
from typing_extensions import TypedDict, NotRequired
else:
from typing import TypedDict, NotRequired

from tutor.core.hooks import Filter


class CELERY_WORKERS_ATTRS_TYPE(TypedDict):
min_replicas: NotRequired[int]
max_replicas: NotRequired[int]
list_length: NotRequired[int]
enable_keda: bool


CELERY_WORKERS_CONFIG: Filter[dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]], []] = (
Filter()
)
6 changes: 2 additions & 4 deletions tutorcelery/patches/k8s-deployments
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
{% for service, variants in CELERY_WORKER_VARIANTS.items() %}
{% for variant in variants%}
{% for service, variants in iter_celery_workers_config().items() %}
{% for variant, config in variants.items() if variant != 'default' %}
{% set deployment = service + "-" + "worker" + "-" + variant.replace("_", "-") %}
---
apiVersion: apps/v1
Expand Down Expand Up @@ -53,7 +52,6 @@ spec:
name: openedx-config
{% endfor %}
{% endfor %}
{% endif %}
{% if CELERY_FLOWER -%}
---
apiVersion: apps/v1
Expand Down
8 changes: 6 additions & 2 deletions tutorcelery/patches/k8s-override
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
{% for service in ["lms", "cms"] %}
{% set exclude = "lms" if service == "cms" else "cms" %}
{% set service_variants = iter_celery_workers_config().get(service) %}
---
apiVersion: apps/v1
kind: Deployment
Expand All @@ -17,6 +18,9 @@ spec:
- "--loglevel=info"
- "--hostname=edx.{{service}}.core.default.%%h"
- "--max-tasks-per-child=100"
{% if is_celery_multiqueue(service) -%}
- "--queues=edx.{{service}}.core.default"
{% else -%}
- "--exclude-queues=edx.{{exclude}}.core.default"
{% endif -%}
{% endfor %}
{% endif %}
1 change: 1 addition & 0 deletions tutorcelery/patches/kustomization-resources
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- plugins/celery/k8s/keda.yml
2 changes: 0 additions & 2 deletions tutorcelery/patches/openedx-cms-production-settings
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
try:
EXPLICIT_QUEUES.update({{CELERY_CMS_EXPLICIT_QUEUES}})
except NameError:
EXPLICIT_QUEUES = {{CELERY_CMS_EXPLICIT_QUEUES}}
{% endif %}
# Prevents losing tasks when workers are shutdown
CELERY_ACKS_LATE = True
3 changes: 0 additions & 3 deletions tutorcelery/patches/openedx-lms-production-settings
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
try:
EXPLICIT_QUEUES.update({{CELERY_LMS_EXPLICIT_QUEUES}})
except NameError:
EXPLICIT_QUEUES = {{CELERY_LMS_EXPLICIT_QUEUES}}
{% endif %}

# Prevents losing tasks when workers are shutdown
CELERY_ACKS_LATE = True
80 changes: 75 additions & 5 deletions tutorcelery/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,93 @@

import click
import importlib_resources
import tutor
from tutor import hooks

from .__about__ import __version__
from .hooks import CELERY_WORKERS_CONFIG, CELERY_WORKERS_ATTRS_TYPE

########################################
# CONFIGURATION
########################################

CORE_CELERY_WORKER_CONFIG: dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]] = {
"lms": {
"default": {
"min_replicas": 0,
"max_replicas": 10,
"list_length": 40,
"enable_keda": False,
},
},
"cms": {
"default": {
"min_replicas": 0,
"max_replicas": 10,
"list_length": 40,
"enable_keda": False,
},
},
}


# The core autoscaling configs are added with a high priority, such that other users can override or
# remove them.
@CELERY_WORKERS_CONFIG.add(priority=hooks.priorities.HIGH)
def _add_core_autoscaling_config(
scaling_config: dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]
) -> dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]:
scaling_config.update(CORE_CELERY_WORKER_CONFIG)
return scaling_config


@tutor.hooks.lru_cache
def get_celery_workers_config() -> dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]:
"""
This function is cached for performance.
"""
return CELERY_WORKERS_CONFIG.apply({})


def iter_celery_workers_config() -> dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]:
"""
Yield:

(name, dict)
"""
return {name: config for name, config in get_celery_workers_config().items()}


def is_celery_multiqueue(service: str) -> bool:
"""
This function validates whether celery is configured in multiqueue mode for a given service
"""
service_celery_config = iter_celery_workers_config().get(service, {})
service_queue_len = len(service_celery_config.keys())

# If no queue variants are configured, multiqueue is disabled
if not service_queue_len:
return False

# Multiqueue is not enabled if only the default variant is available
if service_queue_len == 1 and "default" in service_celery_config:
return False

return True


hooks.Filters.CONFIG_DEFAULTS.add_items(
[
# Add your new settings that have default values here.
# Each new setting is a pair: (setting_name, default_value).
# Prefix your setting names with 'CELERY_'.
("CELERY_VERSION", __version__),
(
"CELERY_WORKER_VARIANTS",
{"lms": ["high", "high_mem"], "cms": ["high", "low"]},
),
("CELERY_LMS_EXPLICIT_QUEUES", {}),
("CELERY_CMS_EXPLICIT_QUEUES", {}),
("CELERY_FLOWER", False),
("CELERY_FLOWER_EXPOSE_SERVICE", False),
("CELERY_FLOWER_HOST", "flower.{{LMS_HOST}}"),
("CELERY_FLOWER_DOCKER_IMAGE", "docker.io/mher/flower:2.0.1"),
("CELERY_MULTIQUEUE_ENABLED", False),
("CELERY_FLOWER_SERVICE_MONITOR", False),
]
)
Expand Down Expand Up @@ -158,10 +220,18 @@
[
("celery/build", "plugins"),
("celery/apps", "plugins"),
("celery/k8s", "plugins"),
],
)


# Make the pod-autoscaling hook functions available within templates
hooks.Filters.ENV_TEMPLATE_VARIABLES.add_items(
[
("iter_celery_workers_config", iter_celery_workers_config),
("is_celery_multiqueue", is_celery_multiqueue),
]
)
########################################
# PATCH LOADING
# (It is safe & recommended to leave
Expand Down
26 changes: 26 additions & 0 deletions tutorcelery/templates/celery/k8s/keda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% for service, variants in iter_celery_workers_config().items() %}
{% for variant, config in variants.items() if config.get('enable_keda') %}
{% set deployment = service + "-" + "worker" + "-" + variant.replace("_", "-")%}
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {% if variant != 'default' %}{{ deployment }}{% else %}{{service}}-worker{% endif %}-scaledobject
spec:
minReplicaCount: {{ config.get("min_replicas", 0) }}
maxReplicaCount: {{ config.get("max_replicas", 30) }}
scaleTargetRef:
kind: Deployment
name: {% if variant != 'default' %}{{ deployment }}{% else %}{{service}}-worker{% endif %}
triggers:
- metadata:
{% if REDIS_HOST == 'redis' -%}
address: redis.{{K8S_NAMESPACE}}:{{REDIS_PORT}}
{% else -%}
address: {{REDIS_HOST}}:{{REDIS_PORT}}
{% endif -%}
listLength: "{{ config.get("list_length", 40)}}"
listName: edx.{{service}}.core.{{variant}}
type: redis
{% endfor %}
{% endfor %}
Loading