Skip to content

Commit

Permalink
Parametrized keda task concurrency in chart (#13571)
Browse files Browse the repository at this point in the history
* Rely on the config.celery.worker_concurrency value
to determine the number of task a keda worker can take
(vs the previous 16 that was hardcoded in the query).
* Updated documentation accordingly
  • Loading branch information
Florent Chehab authored Feb 8, 2021
1 parent e7a2e35 commit 0286121
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
16 changes: 14 additions & 2 deletions chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,11 @@ helm install --name my-release \

## Autoscaling with KEDA

*This feature is still experimental.*

KEDA stands for Kubernetes Event Driven Autoscaling. [KEDA](https://github.com/kedacore/keda) is a custom controller that allows users to create custom bindings
to the Kubernetes [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).
We've built an experimental scaler that allows users to create scalers based on postgreSQL queries. For the moment this exists
on a separate branch, but will be merged upstream soon. To install our custom version of KEDA on your cluster, please run
We've built a scaler that allows users to create scalers based on postgreSQL queries and shared it with the community. This enables us to scale the number of airflow workers deployed on Kubernetes by this chart depending on the number of task that are `queued` or `running`.

```bash
helm repo add kedacore https://kedacore.github.io/charts
Expand All @@ -349,6 +350,17 @@ helm install airflow . \
--set workers.persistence.enabled=false
```

KEDA will derive the desired number of celery workers by querying Airflow metadata database:

```sql
SELECT
ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }})
FROM task_instance
WHERE state='running' OR state='queued'
```

You should set celery worker concurrency through the helm value `config.celery.worker_concurrency` (i.e. instead of airflow.cfg or environment variables) so that the KEDA trigger will be consistent with the worker concurrency setting.

## Using an external redis instance

When using the `CeleryExecutor` or the `CeleryKubernetesExecutor` the chart will by default create a redis Deployment/StatefulSet alongside airflow.
Expand Down
8 changes: 4 additions & 4 deletions chart/templates/workers/worker-kedaautoscaler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ metadata:
spec:
scaleTargetRef:
deploymentName: {{ .Release.Name }}-worker
pollingInterval: {{ .Values.workers.keda.pollingInterval }} # Optional. Default: 30 seconds
cooldownPeriod: {{ .Values.workers.keda.cooldownPeriod }} # Optional. Default: 300 seconds
maxReplicaCount: {{ .Values.workers.keda.maxReplicaCount }} # Optional. Default: 100
pollingInterval: {{ .Values.workers.keda.pollingInterval }} # Optional. Default: 30 seconds
cooldownPeriod: {{ .Values.workers.keda.cooldownPeriod }} # Optional. Default: 300 seconds
maxReplicaCount: {{ .Values.workers.keda.maxReplicaCount }} # Optional. Default: 100
triggers:
- type: postgresql
metadata:
targetQueryValue: "1"
connection: AIRFLOW_CONN_AIRFLOW_DB
query: "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance WHERE state='running' OR state='queued'"
query: "SELECT ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }}) FROM task_instance WHERE state='running' OR state='queued'"
{{- end }}
27 changes: 27 additions & 0 deletions chart/tests/test_keda.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,30 @@ def test_keda_enabled(self, executor, is_created):
assert "RELEASE-NAME-worker" == jmespath.search("metadata.name", docs[0])
else:
assert docs == []

@parameterized.expand(
[
("CeleryExecutor", 8),
("CeleryExecutor", 16),
("CeleryKubernetesExecutor", 8),
("CeleryKubernetesExecutor", 16),
]
)
def test_keda_concurrency(self, executor, concurrency):
"""
ScaledObject should only be created when set to enabled and executor is Celery or CeleryKubernetes
"""
docs = render_chart(
values={
"workers": {"keda": {"enabled": True}, "persistence": {"enabled": False}},
"executor": executor,
"config": {"celery": {"worker_concurrency": concurrency}},
},
show_only=["templates/workers/worker-kedaautoscaler.yaml"],
validate_schema=False,
)
expected_query = (
f"SELECT ceil(COUNT(*)::decimal / {concurrency}) "
"FROM task_instance WHERE state='running' OR state='queued'"
)
self.assertEqual(expected_query, jmespath.search("spec.triggers[0].metadata.query", docs[0]))
1 change: 1 addition & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ config:
rbac: 'True'
celery:
default_queue: celery
worker_concurrency: 16
scheduler:
scheduler_heartbeat_sec: 5
# statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
Expand Down

0 comments on commit 0286121

Please sign in to comment.