Skip to content

Commit

Permalink
CSS-6912: Add workload container health check (#14)
Browse files Browse the repository at this point in the history
* add workload container health check
  • Loading branch information
kelkawi-a authored Jan 26, 2024
1 parent afaba7a commit da04d68
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 98 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ header:
license:
spdx-id: Apache-2.0
copyright-owner: Canonical Ltd.
copyright-year: 2023
content: |
Copyright [year] [owner]
See LICENSE file for licensing details.
Expand Down
60 changes: 56 additions & 4 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path

from dotenv import dotenv_values
from ops import main
from ops import main, pebble
from ops.charm import CharmBase
from ops.model import (
ActiveStatus,
Expand All @@ -21,6 +21,7 @@
ModelError,
WaitingStatus,
)
from ops.pebble import CheckStatus

from literals import (
REQUIRED_CANDID_CONFIG,
Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(self, *args):
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.temporal_worker_pebble_ready, self._on_temporal_worker_pebble_ready)
self.framework.observe(self.on.restart_action, self._on_restart)
self.framework.observe(self.on.update_status, self._on_update_status)

@log_event_handler(logger)
def _on_temporal_worker_pebble_ready(self, event):
Expand Down Expand Up @@ -95,6 +97,48 @@ def _on_config_changed(self, event):
self.unit.status = WaitingStatus("configuring temporal worker")
self._update(event)

@log_event_handler(logger)
def _on_update_status(self, event):
"""Handle `update-status` events.
Args:
event: The `update-status` event triggered at intervals.
"""
try:
self._validate(event)
except ValueError:
return

container = self.unit.get_container(self.name)
valid_pebble_plan = self._validate_pebble_plan(container)
if not valid_pebble_plan:
self._update(event)
return

check = container.get_check("up")
if check.status != CheckStatus.UP:
self.unit.status = MaintenanceStatus("Status check: DOWN")
return

self.unit.status = ActiveStatus(
f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}"
)

def _validate_pebble_plan(self, container):
"""Validate Temporal worker pebble plan.
Args:
container: application container
Returns:
bool of pebble plan validity
"""
try:
plan = container.get_plan().to_dict()
return bool(plan and plan["services"].get(self.name, {}).get("on-check-failure"))
except pebble.ConnectionError:
return False

def _process_env_file(self, event):
"""Process env file attached by user.
Expand Down Expand Up @@ -289,16 +333,23 @@ def _update(self, event):
"startup": "enabled",
"override": "replace",
"environment": context,
"on-check-failure": {"up": "ignore"},
}
},
"checks": {
"up": {
"override": "replace",
"level": "alive",
"period": "10s",
"exec": {"command": "python check_status.py"},
}
},
}

container.add_layer(self.name, pebble_layer, combine=True)
container.replan()

self.unit.status = ActiveStatus(
f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}"
)
self.unit.status = MaintenanceStatus("replanning application")


def convert_env_var(config_var, prefix="TWC_"):
Expand Down Expand Up @@ -327,6 +378,7 @@ def _setup_container(container: Container, proxy: str):
"""
resources_path = Path(__file__).parent / "resources"
_push_container_file(container, resources_path, "/worker.py", resources_path / "worker.py")
_push_container_file(container, resources_path, "/check_status.py", resources_path / "check_status.py")
_push_container_file(
container, resources_path, "/worker-dependencies.txt", resources_path / "worker-dependencies.txt"
)
Expand Down
33 changes: 33 additions & 0 deletions src/resources/check_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

"""Temporal worker status checker."""


import logging
import sys

logger = logging.getLogger(__name__)


def check_worker_status():
"""Check Temporal worker status by reading status file."""
try:
with open("worker_status.txt", "r") as status_file:
status = status_file.read().strip()
logger.info(f"Async status: {status}")

if "Error" in status:
exit_code = 1
else:
exit_code = 0
except FileNotFoundError:
logger.error("Status file not found. Worker is not running.")
exit_code = 1

sys.exit(exit_code)


if __name__ == "__main__":
check_worker_status()
82 changes: 45 additions & 37 deletions src/resources/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,50 +104,58 @@ async def run_worker(unpacked_file_name, module_name):
queue=os.getenv("TWC_QUEUE"),
)

workflows = _import_modules(
"workflows",
unpacked_file_name=unpacked_file_name,
module_name=module_name,
supported_modules=os.getenv("TWC_SUPPORTED_WORKFLOWS").split(","),
)
activities = _import_modules(
"activities",
unpacked_file_name=unpacked_file_name,
module_name=module_name,
supported_modules=os.getenv("TWC_SUPPORTED_ACTIVITIES").split(","),
)
try:
workflows = _import_modules(
"workflows",
unpacked_file_name=unpacked_file_name,
module_name=module_name,
supported_modules=os.getenv("TWC_SUPPORTED_WORKFLOWS").split(","),
)
activities = _import_modules(
"activities",
unpacked_file_name=unpacked_file_name,
module_name=module_name,
supported_modules=os.getenv("TWC_SUPPORTED_ACTIVITIES").split(","),
)

if os.getenv("TWC_TLS_ROOT_CAS").strip() != "":
client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS")
if os.getenv("TWC_TLS_ROOT_CAS").strip() != "":
client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS")

if os.getenv("TWC_AUTH_PROVIDER").strip() != "":
client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header())
if os.getenv("TWC_AUTH_PROVIDER").strip() != "":
client_config.auth = AuthOptions(provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header())

if os.getenv("TWC_ENCRYPTION_KEY").strip() != "":
client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True)
if os.getenv("TWC_ENCRYPTION_KEY").strip() != "":
client_config.encryption = EncryptionOptions(key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True)

worker_opt = None
dsn = os.getenv("TWC_SENTRY_DSN").strip()
if dsn != "":
sentry = SentryOptions(
dsn=dsn,
release=os.getenv("TWC_SENTRY_RELEASE").strip() or None,
environment=os.getenv("TWC_SENTRY_ENVIRONMENT").strip() or None,
redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS"),
)
worker_opt = None
dsn = os.getenv("TWC_SENTRY_DSN").strip()
if dsn != "":
sentry = SentryOptions(
dsn=dsn,
release=os.getenv("TWC_SENTRY_RELEASE").strip() or None,
environment=os.getenv("TWC_SENTRY_ENVIRONMENT").strip() or None,
redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS"),
)

worker_opt = WorkerOptions(sentry=sentry)
worker_opt = WorkerOptions(sentry=sentry)

client = await Client.connect(client_config)
client = await Client.connect(client_config)

worker = Worker(
client=client,
task_queue=os.getenv("TWC_QUEUE"),
workflows=workflows,
activities=activities,
worker_opt=worker_opt,
)
await worker.run()
worker = Worker(
client=client,
task_queue=os.getenv("TWC_QUEUE"),
workflows=workflows,
activities=activities,
worker_opt=worker_opt,
)

with open("worker_status.txt", "w") as status_file:
status_file.write("Success")
await worker.run()
except Exception as e:
# If an error occurs, write the error message to the status file
with open("worker_status.txt", "w") as status_file:
status_file.write(f"Error: {e}")


if __name__ == "__main__": # pragma: nocover
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ async def deploy(ops_test: OpsTest):
url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233)
await ops_test.model.applications[APP_NAME].set_config({"host": url})

await attach_worker_resource_file(ops_test)
await attach_worker_resource_file(ops_test, rsc_type="workflows")
23 changes: 12 additions & 11 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def run_sample_workflow(ops_test: OpsTest):
url = await get_application_url(ops_test, application=APP_NAME_SERVER, port=7233)
logger.info("running workflow on app address: %s", url)

client = await Client.connect(Options(host=url, queue=WORKER_CONFIG["queue"], namespace="default"))
client = await Client.connect(Options(host=url, queue=WORKER_CONFIG["queue"], namespace=WORKER_CONFIG["namespace"]))

# Execute workflow
name = "Jean-luc"
Expand Down Expand Up @@ -113,16 +113,17 @@ async def scale(ops_test: OpsTest, app, units):
"""
await ops_test.model.applications[app].scale(scale=units)

# Wait for model to settle
await ops_test.model.wait_for_idle(
apps=[app],
status="active",
idle_period=30,
raise_on_error=False,
raise_on_blocked=True,
timeout=300,
wait_for_exact_units=units,
)
async with ops_test.fast_forward():
# Wait for model to settle
await ops_test.model.wait_for_idle(
apps=[app],
status="active",
idle_period=30,
raise_on_error=False,
raise_on_blocked=True,
timeout=600,
wait_for_exact_units=units,
)

assert len(ops_test.model.applications[app].units) == units

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ async def test_basic_client(self, ops_test: OpsTest):
await run_sample_workflow(ops_test)

async def test_invalid_env_file(self, ops_test: OpsTest):
"""Connects a client and runs a basic Temporal workflow."""
"""Attaches an invalid .env file to the worker."""
await attach_worker_resource_file(ops_test, rsc_type="env-file")
Loading

0 comments on commit da04d68

Please sign in to comment.