Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSS-5032: Use Temporal lib worker #9

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ user before the charm can be started. This can be done as follows:
```bash
juju run temporal-worker-k8s/0 add-workflows workflows="GreetingWorkflow"
juju run temporal-worker-k8s/0 add-activities activities="compose_greeting"

# To support all defined workflows and activities, use the 'all' keyword
juju run temporal-worker-k8s/0 add-workflows workflows="all"
juju run temporal-worker-k8s/0 add-activities activities="all"
```

Once done, the charm should enter an active state, indicating that the worker is
Expand All @@ -84,6 +88,12 @@ pod to ensure there are no errors with the workload container:
kubectl -n <juju_model_name> logs temporal-worker-k8s-0 -c temporal-worker -f
```

Note: Files defined under the "workflows" directory must only contain classes
decorated using the `@workflow.defn` decorator. Files defined under the
"activities" directory must only contain methods decorated using the
`@activity.defn` decorator. Any additional methods or classes needed should be
defined in other files.

## Verifying

To verify that the setup is running correctly, run `juju status --watch 1s` and
Expand All @@ -102,6 +112,16 @@ To add more replicas you can use the juju scale-application functionality i.e.
juju scale-application temporal-worker-k8s <num_of_replicas_required_replicas>
```

## Error Monitoring

The Temporal worker operator has a built-in Sentry interceptor which can be used
to intercept and capture errors from the Temporal SDK. To enable it, run the
following command:

```bash
juju config temporal-worker-k8s sentry-dsn=<YOUR_SENTRY_DSN>
mertalpt marked this conversation as resolved.
Show resolved Hide resolved
```

## Contributing

This charm is still in active development. Please see the
Expand Down
3 changes: 3 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ remove-activities:

list-activities:
description: Return list of activities supported by worker.

restart:
description: Restart the Temporal worker.
15 changes: 15 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ options:
default: ""
type: string

sentry-dsn:
description: Sentry Data Source Name to send events to.
default: ""
type: string

sentry-release:
description: The version of your code deployed to an environment.
default: ""
type: string

sentry-environment:
description: The environment to log errors to in Sentry.
default: ""
type: string

workflows-file-name:
description: Name of the wheel file resource attached to the charm.
default: ""
Expand Down
43 changes: 36 additions & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
from dotenv import dotenv_values
from ops import main
from ops.charm import CharmBase
from ops.model import ActiveStatus, BlockedStatus, Container, ModelError, WaitingStatus
from ops.model import (
ActiveStatus,
BlockedStatus,
Container,
MaintenanceStatus,
ModelError,
WaitingStatus,
)

from actions.activities import ActivitiesActions
from actions.workflows import WorkflowsActions
Expand Down Expand Up @@ -47,6 +54,7 @@ def __init__(self, *args):
self.framework.observe(self.on.peer_relation_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.temporal_worker_pebble_ready, self._on_temporal_worker_pebble_ready)
self.framework.observe(self.on.restart_action, self._on_restart)

self.workflows_actions = WorkflowsActions(self)
self.activities_actions = ActivitiesActions(self)
Expand All @@ -70,6 +78,26 @@ def _on_temporal_worker_pebble_ready(self, event):

self._update(event)

@log_event_handler(logger)
def _on_restart(self, event):
"""Restart Temporal worker action handler.

Args:
event:The event triggered by the restart action
"""
container = self.unit.get_container(self.name)
if not container.can_connect():
event.defer()
return

self.unit.status = MaintenanceStatus("restarting worker")
container.restart(self.name)
self.unit.status = ActiveStatus(
f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}"
)

event.set_results({"result": "worker successfully restarted"})

@log_event_handler(logger)
def _on_peer_relation_changed(self, event):
"""Handle peer relation changed event.
Expand Down Expand Up @@ -284,20 +312,19 @@ def _update(self, event):
self.name: {
"summary": "temporal worker",
"command": command,
"startup": "disabled",
"startup": "enabled",
"override": "replace",
"environment": self._state.env or {},
}
},
}

container.add_layer(self.name, pebble_layer, combine=True)
if container.get_service(self.name).is_running():
container.replan()
else:
container.start(self.name)
container.replan()

self.unit.status = ActiveStatus()
self.unit.status = ActiveStatus(
f"worker listening to namespace {self.config['namespace']!r} on queue {self.config['queue']!r}"
)


def _setup_container(container: Container):
Expand All @@ -307,7 +334,9 @@ def _setup_container(container: Container):
container: Container unit on which to perform action.
"""
resources_path = Path(__file__).parent / "resources"
_push_container_file(container, resources_path, "/__init__.py", resources_path / "__init__.py")
_push_container_file(container, resources_path, "/worker.py", resources_path / "worker.py")
_push_container_file(container, resources_path, "/sentry_interceptor.py", resources_path / "sentry_interceptor.py")
_push_container_file(
container, resources_path, "/worker-dependencies.txt", resources_path / "worker-dependencies.txt"
)
Expand Down
5 changes: 5 additions & 0 deletions src/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.


"""Temporal worker operator resources."""
91 changes: 91 additions & 0 deletions src/resources/sentry_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

# flake8: noqa
# pylint: skip-file

"""Temporal client worker Sentry interceptor."""

from dataclasses import asdict, is_dataclass
from typing import Any, Optional, Type, Union

from temporalio import activity, workflow
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
ExecuteWorkflowInput,
Interceptor,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)

with workflow.unsafe.imports_passed_through():
from sentry_sdk import Hub, capture_exception, set_context, set_tag


def _set_common_workflow_tags(info: Union[workflow.Info, activity.Info]):
set_tag("temporal.workflow.type", info.workflow_type)
set_tag("temporal.workflow.id", info.workflow_id)


class _SentryActivityInboundInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
# https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues
with Hub(Hub.current):
set_tag("temporal.execution_type", "activity")
set_tag("module", input.fn.__module__ + "." + input.fn.__qualname__)

activity_info = activity.info()
_set_common_workflow_tags(activity_info)
set_tag("temporal.activity.id", activity_info.activity_id)
set_tag("temporal.activity.type", activity_info.activity_type)
set_tag("temporal.activity.task_queue", activity_info.task_queue)
set_tag("temporal.workflow.namespace", activity_info.workflow_namespace)
set_tag("temporal.workflow.run_id", activity_info.workflow_run_id)
try:
return await super().execute_activity(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.activity.input", asdict(input.args[0]))
set_context("temporal.activity.info", activity.info().__dict__)
capture_exception()
mertalpt marked this conversation as resolved.
Show resolved Hide resolved
raise e


class _SentryWorkflowInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
# https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues
with Hub(Hub.current):
set_tag("temporal.execution_type", "workflow")
set_tag("module", input.run_fn.__module__ + "." + input.run_fn.__qualname__)
workflow_info = workflow.info()
_set_common_workflow_tags(workflow_info)
set_tag("temporal.workflow.task_queue", workflow_info.task_queue)
set_tag("temporal.workflow.namespace", workflow_info.namespace)
set_tag("temporal.workflow.run_id", workflow_info.run_id)
try:
return await super().execute_workflow(input)
except Exception as e:
if len(input.args) == 1 and is_dataclass(input.args[0]):
set_context("temporal.workflow.input", asdict(input.args[0]))
set_context("temporal.workflow.info", workflow.info().__dict__)

if not workflow.unsafe.is_replaying():
with workflow.unsafe.sandbox_unrestricted():
capture_exception()
mertalpt marked this conversation as resolved.
Show resolved Hide resolved
raise e


class SentryInterceptor(Interceptor):
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry."""

def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
"""Implement :py:meth:`temporalio.worker.Interceptor.intercept_activity`."""
return _SentryActivityInboundInterceptor(super().intercept_activity(next))

def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
"""Retrieve the workflow interceptor class based on the provided input."""
return _SentryWorkflowInterceptor
1 change: 1 addition & 0 deletions src/resources/worker-dependencies.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
temporalio==1.1.0
temporal-lib-py==1.1.0
protobuf==3.20.0
sentry-sdk==1.29.2
15 changes: 14 additions & 1 deletion src/resources/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import sys
from importlib import import_module

import sentry_sdk
from sentry_interceptor import SentryInterceptor
from temporalio.worker import Worker
from temporallib.auth import (
AuthOptions,
Expand Down Expand Up @@ -120,7 +122,17 @@ async def run_worker(charm_config, supported_workflows, supported_activities, mo
client_config.auth = AuthOptions(provider=charm_config["auth-provider"], config=_get_auth_header(charm_config))

if charm_config["encryption-key"].strip() != "":
client_config.encryption = EncryptionOptions(key=charm_config["encryption-key"])
client_config.encryption = EncryptionOptions(key=charm_config["encryption-key"], compress=True)

interceptors = []
kelkawi-a marked this conversation as resolved.
Show resolved Hide resolved
dsn = charm_config["sentry-dsn"].strip()
if dsn != "":
interceptors = [SentryInterceptor()]
sentry_sdk.init(
dsn=dsn,
release=charm_config["sentry-release"].strip() or None,
environment=charm_config["sentry-environment"].strip() or None,
)

client = await Client.connect(client_config)

Expand All @@ -129,6 +141,7 @@ async def run_worker(charm_config, supported_workflows, supported_activities, mo
task_queue=charm_config["queue"],
workflows=workflows,
activities=activities,
interceptors=interceptors,
)
await worker.run()

Expand Down
5 changes: 5 additions & 0 deletions tests/integration/temporal_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.


"""Temporal client."""
14 changes: 9 additions & 5 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def test_ready(self, _process_wheel_file, _setup_container):
"host": "test-host",
"namespace": "test-namespace",
"queue": "test-queue",
"sentry-dsn": "",
"sentry-release": "",
"sentry-environment": "",
"workflows-file-name": "python_samples-1.1.0-py3-none-any.whl",
"encryption-key": "",
"auth-enabled": True,
Expand Down Expand Up @@ -92,8 +95,6 @@ def test_ready(self, _process_wheel_file, _setup_container):
sa = json.loads(state["supported_activities"])
module_name = json.loads(state["module_name"])

self.assertEqual(harness.model.unit.status, ActiveStatus())

command = f"python worker.py '{json.dumps(dict(config))}' '{','.join(sw)}' '{','.join(sa)}' {module_name}"

# The plan is generated after pebble is ready.
Expand All @@ -102,7 +103,7 @@ def test_ready(self, _process_wheel_file, _setup_container):
"temporal-worker": {
"summary": "temporal worker",
"command": command,
"startup": "disabled",
"startup": "enabled",
"override": "replace",
}
},
Expand All @@ -114,8 +115,11 @@ def test_ready(self, _process_wheel_file, _setup_container):
service = harness.model.unit.get_container(CONTAINER_NAME).get_service("temporal-worker")
self.assertTrue(service.is_running())

# The ActiveStatus is set with no message.
self.assertEqual(harness.model.unit.status, ActiveStatus())
# The ActiveStatus is set.
self.assertEqual(
harness.model.unit.status,
ActiveStatus(f"worker listening to namespace {config['namespace']!r} on queue {config['queue']!r}"),
)


def simulate_lifecycle(harness, config):
Expand Down
Loading