From 028612123d9be45fcbfa31f28d241f7c2226d33e Mon Sep 17 00:00:00 2001 From: Florent Chehab Date: Mon, 8 Feb 2021 19:22:28 +0100 Subject: [PATCH] Parametrized keda task concurrency in chart (#13571) * Rely on the config.celery.worker_concurrency value to determine the number of task a keda worker can take (vs the previous 16 that was hardcoded in the query). * Updated documentation accordingly --- chart/README.md | 16 +++++++++-- .../workers/worker-kedaautoscaler.yaml | 8 +++--- chart/tests/test_keda.py | 27 +++++++++++++++++++ chart/values.yaml | 1 + 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/chart/README.md b/chart/README.md index e5356537285e2..2d0c166114934 100644 --- a/chart/README.md +++ b/chart/README.md @@ -319,10 +319,11 @@ helm install --name my-release \ ## Autoscaling with KEDA +*This feature is still experimental.* + KEDA stands for Kubernetes Event Driven Autoscaling. [KEDA](https://github.com/kedacore/keda) is a custom controller that allows users to create custom bindings to the Kubernetes [Horizontal Pod Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/). -We've built an experimental scaler that allows users to create scalers based on postgreSQL queries. For the moment this exists -on a separate branch, but will be merged upstream soon. To install our custom version of KEDA on your cluster, please run +We've built a scaler that allows users to create scalers based on postgreSQL queries and shared it with the community. This enables us to scale the number of airflow workers deployed on Kubernetes by this chart depending on the number of task that are `queued` or `running`. ```bash helm repo add kedacore https://kedacore.github.io/charts @@ -349,6 +350,17 @@ helm install airflow . \ --set workers.persistence.enabled=false ``` +KEDA will derive the desired number of celery workers by querying Airflow metadata database: + +```sql +SELECT + ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }}) +FROM task_instance +WHERE state='running' OR state='queued' +``` + +You should set celery worker concurrency through the helm value `config.celery.worker_concurrency` (i.e. instead of airflow.cfg or environment variables) so that the KEDA trigger will be consistent with the worker concurrency setting. + ## Using an external redis instance When using the `CeleryExecutor` or the `CeleryKubernetesExecutor` the chart will by default create a redis Deployment/StatefulSet alongside airflow. diff --git a/chart/templates/workers/worker-kedaautoscaler.yaml b/chart/templates/workers/worker-kedaautoscaler.yaml index 1493133f6a807..9533ba93ca966 100644 --- a/chart/templates/workers/worker-kedaautoscaler.yaml +++ b/chart/templates/workers/worker-kedaautoscaler.yaml @@ -36,13 +36,13 @@ metadata: spec: scaleTargetRef: deploymentName: {{ .Release.Name }}-worker - pollingInterval: {{ .Values.workers.keda.pollingInterval }} # Optional. Default: 30 seconds - cooldownPeriod: {{ .Values.workers.keda.cooldownPeriod }} # Optional. Default: 300 seconds - maxReplicaCount: {{ .Values.workers.keda.maxReplicaCount }} # Optional. Default: 100 + pollingInterval: {{ .Values.workers.keda.pollingInterval }} # Optional. Default: 30 seconds + cooldownPeriod: {{ .Values.workers.keda.cooldownPeriod }} # Optional. Default: 300 seconds + maxReplicaCount: {{ .Values.workers.keda.maxReplicaCount }} # Optional. Default: 100 triggers: - type: postgresql metadata: targetQueryValue: "1" connection: AIRFLOW_CONN_AIRFLOW_DB - query: "SELECT ceil(COUNT(*)::decimal / 16) FROM task_instance WHERE state='running' OR state='queued'" + query: "SELECT ceil(COUNT(*)::decimal / {{ .Values.config.celery.worker_concurrency }}) FROM task_instance WHERE state='running' OR state='queued'" {{- end }} diff --git a/chart/tests/test_keda.py b/chart/tests/test_keda.py index 132439da40cc9..e645f9214bba4 100644 --- a/chart/tests/test_keda.py +++ b/chart/tests/test_keda.py @@ -55,3 +55,30 @@ def test_keda_enabled(self, executor, is_created): assert "RELEASE-NAME-worker" == jmespath.search("metadata.name", docs[0]) else: assert docs == [] + + @parameterized.expand( + [ + ("CeleryExecutor", 8), + ("CeleryExecutor", 16), + ("CeleryKubernetesExecutor", 8), + ("CeleryKubernetesExecutor", 16), + ] + ) + def test_keda_concurrency(self, executor, concurrency): + """ + ScaledObject should only be created when set to enabled and executor is Celery or CeleryKubernetes + """ + docs = render_chart( + values={ + "workers": {"keda": {"enabled": True}, "persistence": {"enabled": False}}, + "executor": executor, + "config": {"celery": {"worker_concurrency": concurrency}}, + }, + show_only=["templates/workers/worker-kedaautoscaler.yaml"], + validate_schema=False, + ) + expected_query = ( + f"SELECT ceil(COUNT(*)::decimal / {concurrency}) " + "FROM task_instance WHERE state='running' OR state='queued'" + ) + self.assertEqual(expected_query, jmespath.search("spec.triggers[0].metadata.query", docs[0])) diff --git a/chart/values.yaml b/chart/values.yaml index 7220f89fca69e..764fdfd6abda0 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -711,6 +711,7 @@ config: rbac: 'True' celery: default_queue: celery + worker_concurrency: 16 scheduler: scheduler_heartbeat_sec: 5 # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0