diff --git a/changelog.d/20250127_221956_danfuchs_sentry_take3.md b/changelog.d/20250127_221956_danfuchs_sentry_take3.md
new file mode 100644
index 00000000..d85bbe65
--- /dev/null
+++ b/changelog.d/20250127_221956_danfuchs_sentry_take3.md
@@ -0,0 +1,3 @@
+### Backwards-incompatible changes
+
+- Instrument tracing and exception handling with Sentry. All timings are now calculated with Sentry tracing functionality, and all Slack notifications for errors come from Sentry instead of the Safir `SlackException` machinery.
diff --git a/docs/development/index.rst b/docs/development/index.rst
index f5a838fb..f2a178f9 100644
--- a/docs/development/index.rst
+++ b/docs/development/index.rst
@@ -8,3 +8,4 @@ Here are some guides on developing and testing mobu.
idfdev
github
+ sentry
diff --git a/docs/development/sentry.rst b/docs/development/sentry.rst
new file mode 100644
index 00000000..1b18d44c
--- /dev/null
+++ b/docs/development/sentry.rst
@@ -0,0 +1,32 @@
+##################
+Sentry Integration
+##################
+
+Mobu integrates with Sentry differently than most other apps, because the unit of work isn't a request like in a web app, it's an iteration (or part of an iteration) of a business.
+As a result, some things are different about this Sentry integration.
+
+Scopes and Transactions
+=======================
+
+`Isolation scopes `_ and `transactions `_ are created manually.
+All tags and contexts are set on the isolation scope, and any exceptions are manually captured in the monkey runner loop.
+
+* There is one isolation scope for every execution of a business's ``run`` method.
+* There is one transaction for every execution of a business's ``startup`` method.
+* There is one transaction for every execution of a business's ``shutdown`` method.
+* For ``NotebookRunner`` businesses, a transaction is created for every notebook execution.
+* For other businesses, a transaction is created for each invocation of their ``execute`` methods.
+
+Fingerprints
+============
+
+We add tags to the fingerprint of every event sent to Sentry to force the creation of separate issues where only one issue would be created by default.
+For example, we add notebook and cell ids to the fingerprint to force different issues (and thus different notifications) for errors from different notebooks and cells.
+
+Traces Sampling
+===============
+
+The ``sentry_traces_sample_config``/``sentryTracesSampleConfig`` option can be set to:
+
+* A float, in which case it will be passed directly to `traces_sample_rate `_ when initializing Sentry, and only send that percentage of transactions to Sentry
+* The string "errors", in which case transactions will be sent to Sentry only if errors occurred during them
diff --git a/docs/index.rst b/docs/index.rst
index 4415af1c..85bc3c81 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -17,7 +17,7 @@ Mobu
####
mobu (short for "monkey business") is a continous integration testing framework for the `Rubin Science Platform `__ .
-It attempts to simulate user interactions with Science Platform services continuously, recording successes and failures and optionally reporting failures to a Slack channel via a Slack incoming webhook.
+It attempts to simulate user interactions with Science Platform services continuously, recording successes and failures and reporting failures to `Sentry `_.
It runs some number of "monkeys" that simulate a random user of the Science Platform.
Those monkeys are organized into "flocks" that share a single configuration across all of the monkeys.
It can be used for both monitoring and synthetic load testing.
diff --git a/src/mobu/config.py b/src/mobu/config.py
index 44bcb1cb..0bb2b753 100644
--- a/src/mobu/config.py
+++ b/src/mobu/config.py
@@ -4,7 +4,7 @@
from pathlib import Path
from textwrap import dedent
-from typing import Self
+from typing import Literal, Self
import yaml
from pydantic import AliasChoices, Field, HttpUrl
@@ -182,6 +182,42 @@ class Configuration(BaseSettings):
),
)
+ sentry_dsn: str | None = Field(
+ None,
+ title="Sentry DSN",
+ description="The Sentry DSN: https://docs.sentry.io/platforms/python/#configure",
+ examples=[
+ "https://foo@bar.ingest.us.sentry.io/123456",
+ ],
+ validation_alias=AliasChoices("MOBU_SENTRY_DSN", "mobuSentryDsn"),
+ )
+
+ sentry_traces_sample_config: float | Literal["errors"] = Field(
+ 0,
+ title="Sentry traces sample config",
+ description=(
+ "Set the Sentry sampling strategy for traces. If this is a float,"
+ " it will be passed as the traces_sample_rate: https://docs.sentry.io/platforms/python/configuration/sampling/#configuring-the-transaction-sample-rate"
+ ' If this is set to "errors", then all transactions during which'
+ " an error occurred will be sent."
+ ),
+ examples=[0, 0.5, "errors"],
+ validation_alias=AliasChoices(
+ "MOBU_SENTRY_TRACES_SAMPLE_CONFIG", "sentryTracesSampleConfig"
+ ),
+ )
+
+ sentry_environment: str = Field(
+ ...,
+ title="Sentry environment",
+ description=(
+ "The Sentry environment: https://docs.sentry.io/concepts/key-terms/environments/"
+ ),
+ validation_alias=AliasChoices(
+ "MOBU_SENTRY_ENVIRONMENT", "sentryEnvironment"
+ ),
+ )
+
gafaelfawr_token: str | None = Field(
None,
title="Gafaelfawr admin token",
diff --git a/src/mobu/constants.py b/src/mobu/constants.py
index cfc3c4db..f0189fce 100644
--- a/src/mobu/constants.py
+++ b/src/mobu/constants.py
@@ -49,3 +49,6 @@
"^[a-z0-9](?:[a-z0-9]|-[a-z0-9])*[a-z](?:[a-z0-9]|-[a-z0-9])*$"
)
"""Regex matching all valid usernames."""
+
+SENTRY_ERRORED_KEY = "errored"
+"""Tag name to set on transactions that had exceptions."""
diff --git a/src/mobu/exceptions.py b/src/mobu/exceptions.py
index ffe98e49..68aa98a7 100644
--- a/src/mobu/exceptions.py
+++ b/src/mobu/exceptions.py
@@ -2,91 +2,27 @@
from __future__ import annotations
-import datetime
-import json
-import re
from pathlib import Path
-from typing import Self, override
+from typing import Self
-import rubin.nublado.client.exceptions as ne
from fastapi import status
from pydantic import ValidationError
from safir.fastapi import ClientRequestError
from safir.models import ErrorLocation
-from safir.slack.blockkit import (
- SlackBaseBlock,
- SlackBaseField,
- SlackCodeBlock,
- SlackMessage,
- SlackTextBlock,
- SlackTextField,
-)
-
-_ANSI_REGEX = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]")
-"""Regex that matches ANSI escape sequences."""
+from safir.sentry import SentryException, SentryWebException
__all__ = [
- "CodeExecutionError",
"ComparisonError",
"FlockNotFoundError",
"GafaelfawrParseError",
"GafaelfawrWebError",
"GitHubFileNotFoundError",
- "JupyterProtocolError",
- "JupyterTimeoutError",
- "JupyterWebError",
- "MobuMixin",
- "MobuSlackException",
- "MobuSlackWebException",
"MonkeyNotFoundError",
"SubprocessError",
- "TAPClientError",
]
-def _remove_ansi_escapes(string: str) -> str:
- """Remove ANSI escape sequences from a string.
-
- Jupyter labs like to format error messages with lots of ANSI escape
- sequences, and Slack doesn't like that in messages (nor do humans want to
- see them). Strip them out.
-
- Based on `this StackOverflow answer
- `__.
-
- Parameters
- ----------
- string
- String to strip ANSI escapes from.
-
- Returns
- -------
- str
- Sanitized string.
- """
- return _ANSI_REGEX.sub("", string)
-
-
-class MobuMixin:
- """Mixin class to add `event` and `monkey` fields to Exception."""
-
- def mobu_init(
- self, event: str | None = None, monkey: str | None = None
- ) -> None:
- """Initialize mobu-specific fields."""
- self.event: str | None = event
- self.monkey: str | None = monkey
-
- def mobu_fields(self) -> list[SlackBaseField]:
- fields: list[SlackBaseField] = []
- if self.event:
- fields.append(SlackTextField(heading="Event", text=self.event))
- if self.monkey:
- fields.append(SlackTextField(heading="Monkey", text=self.monkey))
- return fields
-
-
-class GafaelfawrParseError(ne.NubladoClientSlackException):
+class GafaelfawrParseError(SentryException):
"""Unable to parse the reply from Gafaelfawr.
Parameters
@@ -123,25 +59,14 @@ def from_exception(
def __init__(
self, message: str, error: str, user: str | None = None
) -> None:
- super().__init__(message, user)
- self.error = error
-
- @override
- def to_slack(self) -> SlackMessage:
- """Convert to a Slack message for Slack alerting.
+ super().__init__(message)
+ if user:
+ self.contexts["validation_info"] = {"error": error}
- Returns
- -------
- SlackMessage
- Slack message suitable for posting as an alert.
- """
- message = super().to_slack()
- block = SlackCodeBlock(heading="Error", code=self.error)
- message.blocks.append(block)
- return message
+ self.error = error
-class GafaelfawrWebError(ne.NubladoClientSlackWebException):
+class GafaelfawrWebError(SentryWebException):
"""An API call to Gafaelfawr failed."""
@@ -169,305 +94,13 @@ def __init__(self, monkey: str) -> None:
super().__init__(msg, ErrorLocation.path, ["monkey"])
-class MobuSlackException(ne.NubladoClientSlackException, MobuMixin):
- """Represents an exception that can be reported to Slack.
-
- This adds some additional fields to `~safir.slack.blockkit.SlackException`
- but is otherwise equivalent. It is intended to be subclassed. Subclasses
- must override the `to_slack` method.
-
- Parameters
- ----------
- msg
- Exception message.
- user
- User mobu was operating as when the exception happened.
- started_at
- When the operation started.
- failed_at
- When the operation failed (defaults to the current time).
-
- Attributes
- ----------
- started_at
- When the operation that ended in an exception started.
- failed_at
- When the operation that ended in an exception failed
- (defaults to the current time).
- monkey
- The running monkey in which the exception happened.
- event
- Name of the business event that provoked the exception.
- annotations
- Additional annotations for the running business.
- """
-
- def __init__(
- self,
- msg: str,
- user: str | None = None,
- *,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- monkey: str | None = None,
- event: str | None = None,
- ) -> None:
- super().__init__(msg, user, failed_at=failed_at, started_at=started_at)
- self.mobu_init(monkey=monkey, event=event)
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.NubladoClientSlackException,
- *,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
-
- Returns
- -------
- MobuSlackException
- Converted exception.
- """
- new_exc = cls(
- msg=exc.message,
- user=exc.user,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- event=event,
- monkey=monkey,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- image = self.annotations.get("image")
- if image:
- fields.append(SlackTextField(heading="Image", text=image))
- return fields
-
- @override
- def to_slack(self) -> SlackMessage:
- """Format the error as a Slack Block Kit message.
-
- This is the generic version that only reports the text of the
- exception and common fields. Most classes will want to override it.
-
- Returns
- -------
- SlackMessage
- Formatted Slack message.
- """
- return SlackMessage(
- message=str(self),
- blocks=self.common_blocks(),
- fields=self.common_fields(),
- )
-
-
-class MobuSlackWebException(
- ne.NubladoClientSlackWebException, MobuSlackException
-):
- """Represents a web exception that can be reported to Slack.
-
- Similar to `MobuSlackException`, this adds some additional fields to
- `~rubin.nublado.client.SlackWebException` but is otherwise equivalent. It
- is intended to be subclassed. Subclasses may want to override the
- `to_slack` method.
- """
-
- @override
- def common_blocks(self) -> list[SlackBaseBlock]:
- blocks = MobuSlackException.common_blocks(self)
- if self.url:
- text = f"{self.method} {self.url}" if self.method else self.url
- blocks.append(SlackTextBlock(heading="URL", text=text))
- return blocks
-
-
-class NotebookRepositoryError(MobuSlackException):
+class NotebookRepositoryError(Exception):
"""The repository containing notebooks to run is not valid."""
-class RepositoryConfigError(MobuSlackException):
+class RepositoryConfigError(Exception):
"""The in-repo mobu.yaml config file is invalid."""
- def __init__(
- self,
- *,
- err: Exception,
- user: str,
- repo_url: str,
- repo_ref: str,
- config_file: Path,
- ) -> None:
- super().__init__(str(err), user)
- self.err = err
- self.user = user
- self.repo_url = repo_url
- self.repo_ref = repo_ref
- self.config_file = config_file
-
- @override
- def to_slack(self) -> SlackMessage:
- message = super().to_slack()
- message.blocks += [
- SlackTextBlock(heading="GitHub Repository", text=self.repo_url),
- SlackTextBlock(heading="Git Ref", text=self.repo_ref),
- ]
- message.attachments += [
- SlackCodeBlock(
- heading="Error",
- code=f"{type(self.err).__name__}: {self.err!s}",
- )
- ]
- message.message = f"Error parsing config file: {self.config_file}"
- return message
-
-
-class CodeExecutionError(ne.CodeExecutionError, MobuMixin):
- """Error generated by code execution in a notebook on JupyterLab."""
-
- def __init__(
- self,
- *,
- user: str,
- code: str | None = None,
- code_type: str = "code",
- error: str | None = None,
- status: str | None = None,
- monkey: str | None = None,
- event: str | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> None:
- super().__init__(
- user=user,
- code=code,
- code_type=code_type,
- error=error,
- status=status,
- started_at=started_at,
- failed_at=failed_at,
- )
- self.mobu_init(monkey=monkey, event=event)
-
- def __str__(self) -> str:
- if self.annotations.get("notebook"):
- notebook = self.annotations["notebook"]
- if self.annotations.get("cell"):
- cell = self.annotations["cell"]
- msg = f"{self.user}: cell {cell} of notebook {notebook} failed"
- else:
- msg = f"{self.user}: cell of notebook {notebook} failed"
- if self.status:
- msg += f" (status: {self.status})"
- if self.code:
- msg += f"\nCode: {self.code}"
- elif self.code:
- msg = f"{self.user}: running {self.code_type} '{self.code}' failed"
- else:
- msg = f"{self.user}: running {self.code_type} failed"
- if self.error:
- msg += f"\nError: {_remove_ansi_escapes(self.error)}"
- return msg
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- return fields
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.CodeExecutionError,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- CodeExecutionError
- Converted exception.
- """
- new_exc = cls(
- user=exc.user or "",
- code=exc.code,
- code_type=exc.code_type,
- error=exc.error,
- status=exc.status,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- monkey=monkey,
- event=event,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
class GitHubFileNotFoundError(Exception):
"""Tried to retrieve contents for a non-existent file in a GitHub
@@ -475,539 +108,20 @@ class GitHubFileNotFoundError(Exception):
"""
-class JupyterProtocolError(ne.JupyterProtocolError, MobuMixin):
- """Some error occurred when talking to JupyterHub or JupyterLab."""
-
- def __init__(
- self,
- msg: str,
- user: str | None = None,
- *,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- monkey: str | None = None,
- event: str | None = None,
- ) -> None:
- super().__init__(
- msg=msg, user=user, started_at=started_at, failed_at=failed_at
- )
- self.mobu_init(monkey=monkey, event=event)
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.JupyterProtocolError,
- event: str | None = None,
- monkey: str | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- annotations: dict[str, str] | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- JupyterProtocolError
- Converted exception.
- """
- new_exc = cls(
- msg=exc.message,
- user=exc.user,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- monkey=monkey,
- event=event,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- return fields
-
-
-class JupyterSpawnError(ne.JupyterSpawnError, MobuMixin):
- """The Jupyter Lab pod failed to spawn."""
-
- def __init__(
- self,
- log: str,
- user: str,
- message: str | None = None,
- monkey: str | None = None,
- event: str | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> None:
- if message:
- message = f"Spawning lab failed: {message}"
- else:
- message = "Spawning lab failed"
- super().__init__(
- message, user, started_at=started_at, failed_at=failed_at
- )
- self.log = log
- self.mobu_init(monkey=monkey, event=event)
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.JupyterSpawnError,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- JupyterSpawnError
- Converted exception.
- """
- new_exc = cls(
- log=exc.log,
- user=exc.user or "",
- message=exc.message,
- monkey=monkey,
- event=event,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
- @classmethod
- def from_exception(
- cls,
- log: str,
- exc: Exception,
- user: str,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- *,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- ) -> Self:
- """Convert from an arbitrary exception to a spawn error.
-
- Parameters
- ----------
- log
- Log of the spawn to this point.
- exc
- Exception that terminated the spawn attempt.
- user
- Username of the user spawning the lab.
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- JupyterSpawnError
- Converted exception.
- """
- client_exc = super().from_exception(log, exc, user)
- new_exc = cls.from_client_exception(
- client_exc,
- monkey=monkey,
- event=event,
- started_at=started_at or client_exc.started_at,
- failed_at=failed_at or client_exc.failed_at,
- )
- new_exc.annotations.update(client_exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- return fields
-
-
-class JupyterTimeoutError(ne.JupyterTimeoutError, MobuMixin):
- """Timed out waiting for the lab to spawn."""
-
- def __init__(
- self,
- msg: str,
- user: str,
- log: str | None = None,
- *,
- monkey: str | None = None,
- event: str | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> None:
- super().__init__(msg, user, started_at=started_at, failed_at=failed_at)
- self.log = log
- self.mobu_init(monkey=monkey, event=event)
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- return fields
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.JupyterTimeoutError,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- JupyterTimeoutError
- Converted exception.
- """
- new_exc = cls(
- log=exc.log,
- user=exc.user or "",
- msg=exc.message,
- monkey=monkey,
- event=event,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
-
-class JupyterWebError(ne.JupyterWebError, MobuMixin):
- """An error occurred when talking to JupyterHub or a Jupyter lab."""
-
- def __init__(
- self,
- msg: str,
- user: str | None = None,
- *,
- monkey: str | None = None,
- event: str | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> None:
- super().__init__(
- message=msg, user=user, started_at=started_at, failed_at=failed_at
- )
- self.mobu_init(monkey=monkey, event=event)
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.JupyterWebError,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- JupyterWebError
- Converted exception.
- """
- new_exc = cls(
- msg=exc.message,
- user=exc.user,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- monkey=monkey,
- event=event,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- new_exc.event = event
- new_exc.method = exc.method
- new_exc.url = exc.url
- new_exc.body = exc.body
- return new_exc
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- return fields
-
-
-class JupyterWebSocketError(ne.JupyterWebSocketError, MobuMixin):
- """An error occurred talking to the Jupyter lab WebSocket."""
-
- def __init__(
- self,
- msg: str,
- *,
- user: str,
- code: int | None = None,
- reason: str | None = None,
- status: int | None = None,
- body: bytes | None = None,
- monkey: str | None = None,
- event: str | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> None:
- super().__init__(
- msg=msg,
- user=user,
- code=code,
- reason=reason,
- status=status,
- started_at=started_at,
- failed_at=failed_at,
- body=body,
- )
- self.mobu_init(monkey=monkey, event=event)
-
- @override
- def to_slack(self) -> SlackMessage:
- """Format this exception as a Slack notification.
-
- Returns
- -------
- SlackMessage
- Formatted message.
- """
- message = super().to_slack()
-
- if self.reason:
- reason = self.reason
- if self.code:
- reason = f"{self.reason} ({self.code})"
- else:
- reason = self.reason
- field = SlackTextField(heading="Reason", text=reason)
- message.fields.append(field)
- elif self.code:
- field = SlackTextField(heading="Code", text=str(self.code))
- message.fields.append(field)
-
- if self.body:
- block = SlackTextBlock(heading="Body", text=self.body)
- message.blocks.append(block)
-
- return message
-
- @override
- def common_fields(self) -> list[SlackBaseField]:
- """Return common fields to put in any alert.
-
- Returns
- -------
- list of SlackBaseField
- Common fields to add to the Slack message.
- """
- fields = super().common_fields()
- fields.extend(self.mobu_fields())
- return fields
-
- @classmethod
- def from_client_exception(
- cls,
- exc: ne.JupyterWebSocketError,
- monkey: str | None = None,
- event: str | None = None,
- annotations: dict[str, str] | None = None,
- started_at: datetime.datetime | None = None,
- failed_at: datetime.datetime | None = None,
- ) -> Self:
- """
- Add Mobu-specific fields to exception from NubladoClient layer.
-
- Parameters
- ----------
- exc
- Original exception
- monkey
- Monkey spawning the lab, if known.
- event
- Event (from mobu's perspective) spawning the lab, if known.
- annotations
- Additional annotations
- started_at
- Timestamp for beginning of operation that caused the exception,
- if known.
- failed_at
- Timestamp for failure of operation that caused the exception,
- if known (defaults to the current time).
-
- Returns
- -------
- JupyterWebSocketError
- Converted exception.
- """
- body = exc.body
- if body is not None:
- body_bytes = body.encode()
- new_exc = cls(
- msg=exc.message,
- user=exc.user or "",
- code=exc.code,
- reason=exc.reason,
- status=exc.status,
- body=body_bytes,
- monkey=monkey,
- event=event,
- started_at=started_at or exc.started_at,
- failed_at=failed_at or exc.failed_at,
- )
- new_exc.annotations.update(exc.annotations or {})
- new_exc.annotations.update(annotations or {})
- return new_exc
-
-
-class TAPClientError(MobuSlackException):
- """Creating a TAP client failed."""
-
- def __init__(self, exc: Exception, *, user: str) -> None:
- if str(exc):
- error = f"{type(exc).__name__}: {exc!s}"
- else:
- error = type(exc).__name__
- msg = f"Unable to create TAP client: {error}"
- super().__init__(msg, user)
-
-
-class SubprocessError(MobuSlackException):
+class SubprocessError(SentryException):
"""Running a subprocess failed."""
def __init__(
self,
msg: str,
*,
- user: str | None = None,
returncode: int | None = None,
stdout: str | None = None,
stderr: str | None = None,
cwd: Path | None = None,
env: dict[str, str] | None = None,
) -> None:
- super().__init__(msg, user)
+ super().__init__(msg)
self.msg = msg
self.returncode = returncode
self.stdout = stdout
@@ -1015,6 +129,14 @@ def __init__(
self.cwd = cwd
self.env = env
+ self.contexts["subprocess_info"] = {
+ "return_code": str(self.returncode),
+ "stdout": self.stdout,
+ "stderr": self.stderr,
+ "directory": str(self.cwd),
+ "env": self.env,
+ }
+
def __str__(self) -> str:
return (
f"{self.msg} with rc={self.returncode};"
@@ -1022,74 +144,55 @@ def __str__(self) -> str:
f" cwd='{self.cwd}'; env='{self.env}'"
)
- @override
- def to_slack(self) -> SlackMessage:
- """Format this exception as a Slack notification.
- Returns
- -------
- SlackMessage
- Formatted message.
- """
- message = SlackMessage(
- message=self.msg,
- blocks=self.common_blocks(),
- fields=self.common_fields(),
- )
-
- field = SlackTextField(
- heading="Return Code", text=str(self.returncode)
- )
- message.fields.append(field)
- if self.cwd:
- message.fields.append(field)
- field = SlackTextField(heading="Directory", text=str(self.cwd))
- if self.stdout:
- block = SlackCodeBlock(heading="Stdout", code=self.stdout)
- message.blocks.append(block)
- if self.stderr:
- block = SlackCodeBlock(heading="Stderr", code=self.stderr)
- message.blocks.append(block)
- if self.env:
- block = SlackCodeBlock(
- heading="Environment",
- code=json.dumps(self.env, sort_keys=True, indent=4),
- )
- message.blocks.append(block)
- return message
-
-
-class ComparisonError(MobuSlackException):
+class ComparisonError(SentryException):
"""Comparing two strings failed."""
def __init__(
self,
- user: str | None = None,
*,
expected: str,
received: str,
) -> None:
- super().__init__("Comparison failed", user)
+ super().__init__("Comparison failed")
self.expected = expected
self.received = received
+ self.contexts["comparison_info"] = {
+ "expected": self.expected,
+ "received": self.received,
+ }
+
def __str__(self) -> str:
return (
f"Comparison failed: expected '{self.expected}', but"
f" received '{self.received}'"
)
- def to_slack(self) -> SlackMessage:
- """Format this exception as a Slack notification.
- Returns
- -------
- SlackMessage
- Formatted message.
- """
- message = super().to_slack()
- field = SlackTextField(heading="Expected", text=self.expected)
- message.fields.append(field)
- field = SlackTextField(heading="Received", text=self.received)
- message.fields.append(field)
- return message
+class JupyterSpawnTimeoutError(Exception):
+ """Timed out waiting for the lab to spawn."""
+
+
+class JupyterDeleteTimeoutError(Exception):
+ """Timed out waiting for a lab to delete."""
+
+
+class JupyterSpawnError(Exception):
+ """The Jupyter Lab pod failed to spawn."""
+
+
+class NotebookCellExecutionError(Exception):
+ """Error when executing a notebook cell."""
+
+
+class TAPClientError(Exception):
+ """Creating a TAP client failed."""
+
+ def __init__(self, exc: Exception) -> None:
+ if str(exc):
+ error = f"{type(exc).__name__}: {exc!s}"
+ else:
+ error = type(exc).__name__
+ msg = f"Unable to create TAP client: {error}"
+ super().__init__(msg)
diff --git a/src/mobu/main.py b/src/mobu/main.py
index 0c0b4dca..5f210574 100644
--- a/src/mobu/main.py
+++ b/src/mobu/main.py
@@ -23,6 +23,8 @@
from safir.middleware.x_forwarded import XForwardedMiddleware
from safir.slack.webhook import SlackRouteErrorHandler
+from mobu.sentry import sentry_init
+
from .asyncio import schedule_periodic
from .dependencies.config import config_dependency
from .dependencies.context import context_dependency
@@ -88,6 +90,12 @@ def create_app(*, load_config: bool = True) -> FastAPI:
required but the configuration won't matter.
"""
if load_config:
+ sentry_init(
+ dsn=config_dependency.config.sentry_dsn,
+ env=config_dependency.config.sentry_environment,
+ traces_sample_config=config_dependency.config.sentry_traces_sample_config,
+ )
+
config = config_dependency.config
path_prefix = config.path_prefix
github_ci_app = config.github_ci_app
diff --git a/src/mobu/models/business/base.py b/src/mobu/models/business/base.py
index f2e36825..f230032f 100644
--- a/src/mobu/models/business/base.py
+++ b/src/mobu/models/business/base.py
@@ -6,8 +6,6 @@
from pydantic import BaseModel, ConfigDict, Field, PlainSerializer
from safir.logging import LogLevel
-from ..timings import StopwatchData
-
__all__ = [
"BusinessConfig",
"BusinessData",
@@ -89,6 +87,4 @@ class BusinessData(BaseModel):
..., title="If the business is currently in the process of refreshing"
)
- timings: list[StopwatchData] = Field(..., title="Timings of events")
-
model_config = ConfigDict(extra="forbid")
diff --git a/src/mobu/sentry.py b/src/mobu/sentry.py
new file mode 100644
index 00000000..ae1dac50
--- /dev/null
+++ b/src/mobu/sentry.py
@@ -0,0 +1,126 @@
+"""Helpers for sentry instrumentation."""
+
+from __future__ import annotations
+
+from collections.abc import Generator
+from contextlib import contextmanager
+from typing import Any, Literal
+
+import sentry_sdk
+from safir.sentry import before_send_handler
+from sentry_sdk.tracing import Span, Transaction
+from sentry_sdk.types import Event, Hint
+
+from mobu.constants import SENTRY_ERRORED_KEY
+
+__all__ = [
+ "before_send",
+ "capturing_start_span",
+ "fingerprint",
+ "start_transaction",
+]
+
+
+def fingerprint(event: Event) -> list[str]:
+ """Generate a fingerprint to force separate issues for tag combos."""
+ fingerprint = event.get("fingerprint", [])
+ return [
+ *fingerprint,
+ "{{ tags.flock }}",
+ "{{ tags.business }}",
+ "{{ tags.notebook }}",
+ "{{ tags.cell }}",
+ ]
+
+
+def before_send(event: Event, hint: Hint) -> Event | None:
+ """Add tags to fingerprint so that distinct issues are created."""
+ event["fingerprint"] = fingerprint(event)
+ return before_send_handler(event, hint)
+
+
+@contextmanager
+def capturing_start_span(op: str, **kwargs: Any) -> Generator[Span]:
+ """Start a span, set the op/start time in the context, and capture errors.
+
+ Setting the op and start time in the context will propagate it to any error
+ events that get sent to Sentry, event if the trace does not get sent.
+
+ Explicitly capturing errors in the span will tie the Sentry events to this
+ specific span, rather than tying them to the span/transaction where they
+ would be handled otherwise.
+ """
+ with sentry_sdk.start_span(op=op, **kwargs) as span:
+ sentry_sdk.get_isolation_scope().set_context(
+ "phase", {"phase": op, "started_at": span.start_timestamp}
+ )
+ sentry_sdk.get_isolation_scope().set_tag("phase", op)
+
+ # You can't see the time a span started in the Sentry UI, only the time
+ # the entire transaction started
+ span.set_tag("started_at", span.start_timestamp)
+
+ try:
+ yield span
+ except Exception as e:
+ # Even though we're capturing exceptions at the business level,
+ # Sentry knows not to send them twice.
+ sentry_sdk.capture_exception(e)
+ raise
+ finally:
+ sentry_sdk.get_isolation_scope().remove_context("phase")
+ sentry_sdk.get_isolation_scope().remove_tag("phase")
+
+
+@contextmanager
+def start_transaction(
+ name: str, op: str, **kwargs: Any
+) -> Generator[Transaction | Span]:
+ """Start a transaction and mark it if an exception is raised."""
+ with sentry_sdk.start_transaction(
+ name=name, op=op, **kwargs
+ ) as transaction:
+ try:
+ yield transaction
+ except Exception:
+ transaction.set_tag(SENTRY_ERRORED_KEY, True)
+ raise
+
+
+@contextmanager
+def capturing_isolation_scope() -> Generator:
+ """Run in a new isolation scope and capture any uncaught errors."""
+ with sentry_sdk.isolation_scope():
+ try:
+ yield
+ except Exception as exc:
+ sentry_sdk.capture_exception(exc)
+ raise
+
+
+def send_all_error_transactions(event: Event, _: Hint) -> Event | None:
+ """Send the transaction if an exception was raised during it."""
+ if event.get("tags", {}).get(SENTRY_ERRORED_KEY, False):
+ return event
+ return None
+
+
+def sentry_init(
+ dsn: str | None, env: str, traces_sample_config: float | Literal["errors"]
+) -> None:
+ """Initialize Sentry with different functionality based on env vars."""
+ if traces_sample_config == "errors":
+ sentry_sdk.init(
+ dsn=dsn,
+ environment=env,
+ before_send=before_send,
+ traces_sample_rate=1,
+ before_send_transaction=send_all_error_transactions,
+ )
+ else:
+ sentry_sdk.init(
+ dsn=dsn,
+ environment=env,
+ before_send=before_send,
+ traces_sample_rate=traces_sample_config,
+ )
diff --git a/src/mobu/services/business/base.py b/src/mobu/services/business/base.py
index 2a890c56..e11ed146 100644
--- a/src/mobu/services/business/base.py
+++ b/src/mobu/services/business/base.py
@@ -18,7 +18,7 @@
from ...events import Events
from ...models.business.base import BusinessData, BusinessOptions
from ...models.user import AuthenticatedUser
-from ..timings import Timings
+from ...sentry import capturing_start_span, start_transaction
T = TypeVar("T", bound="BusinessOptions")
U = TypeVar("U")
@@ -88,12 +88,12 @@ class Business(Generic[T], metaclass=ABCMeta):
Number of successes.
failure_count
Number of failures.
- timings
- Execution timings.
stopping
Whether `stop` has been called and further execution should stop.
flock
Flock that is running this business, if it is running in a flock.
+ name
+ The name of this kind of business
"""
def __init__(
@@ -113,11 +113,11 @@ def __init__(
self.logger = logger
self.success_count = 0
self.failure_count = 0
- self.timings = Timings()
self.control: Queue[BusinessCommand] = Queue()
self.stopping = False
self.refreshing = False
self.flock = flock
+ self.name = type(self).__name__
# Methods that should be overridden by child classes if needed.
@@ -155,7 +155,11 @@ async def run(self) -> None:
self.logger.info("Starting up...")
try:
try:
- await self.startup()
+ with start_transaction(
+ name=f"{self.name} - startup",
+ op=f"mobu.{self.name}.startup",
+ ):
+ await self.startup()
except Exception:
# Strictly speaking, this is not an iteration, but unless we
# count startup failure as a failed iteration, a business that
@@ -175,7 +179,12 @@ async def run(self) -> None:
await self.idle()
self.logger.info("Shutting down...")
- await self.shutdown()
+
+ with start_transaction(
+ name=f"{self.name} - shutdown",
+ op=f"mobu.{self.name}.shutdown",
+ ):
+ await self.shutdown()
await self.close()
finally:
# Tell the control channel we've processed the stop command.
@@ -199,7 +208,7 @@ async def run_once(self) -> None:
async def idle(self) -> None:
"""Pause at the end of each business loop."""
self.logger.info("Idling...")
- with self.timings.start("idle"):
+ with capturing_start_span(op="idle"):
await self.pause(self.options.idle_time)
async def error_idle(self) -> None:
@@ -320,11 +329,10 @@ async def iter_next() -> U:
def dump(self) -> BusinessData:
return BusinessData(
- name=type(self).__name__,
+ name=self.name,
failure_count=self.failure_count,
success_count=self.success_count,
refreshing=self.refreshing,
- timings=self.timings.dump(),
)
def common_event_attrs(self) -> CommonEventAttrs:
@@ -332,7 +340,7 @@ def common_event_attrs(self) -> CommonEventAttrs:
return {
"flock": self.flock,
"username": self.user.username,
- "business": self.__class__.__name__,
+ "business": self.name,
}
async def _pause_no_return(self, interval: timedelta) -> None:
diff --git a/src/mobu/services/business/gitlfs.py b/src/mobu/services/business/gitlfs.py
index c36874d7..976b6d80 100644
--- a/src/mobu/services/business/gitlfs.py
+++ b/src/mobu/services/business/gitlfs.py
@@ -8,12 +8,14 @@
from urllib.parse import urlparse
from httpx import AsyncClient
+from safir.sentry import duration
from structlog.stdlib import BoundLogger
from ...events import Events, GitLfsCheck
from ...exceptions import ComparisonError
from ...models.business.gitlfs import GitLFSBusinessOptions
from ...models.user import AuthenticatedUser
+from ...sentry import capturing_start_span, start_transaction
from ...storage.git import Git
from .base import Business
@@ -75,21 +77,26 @@ async def execute(self) -> None:
Each time through the loop, the entire set of repositories is
created anew.
"""
- self.logger.info("Running Git-LFS check...")
- event = GitLfsCheck(success=False, **self.common_event_attrs())
- try:
- with self.timings.start("execute git-lfs check") as sw:
- await self._git_lfs_check()
- elapsed = sw.elapsed.total_seconds()
- self.logger.info(f"...Git-LFS check finished after {elapsed}s")
+ with start_transaction(
+ name=f"{self.name} - execute",
+ op="mobu.gitlfs.check",
+ ):
+ self.logger.info("Running Git-LFS check...")
+ event = GitLfsCheck(success=False, **self.common_event_attrs())
+ try:
+ with capturing_start_span(op="mobu.gitlfs.check") as span:
+ await self._git_lfs_check()
+ span_duration = duration(span)
+ elapsed = span_duration.total_seconds()
+ self.logger.info(f"...Git-LFS check finished after {elapsed}s")
- event.duration = sw.elapsed
- event.success = True
- except:
- event.success = False
- raise
- finally:
- await self.events.git_lfs_check.publish(event)
+ event.duration = span_duration
+ event.success = True
+ except:
+ event.success = False
+ raise
+ finally:
+ await self.events.git_lfs_check.publish(event)
def _git(self, repo: Path) -> Git:
"""Return a configured Git client for a specified repo path.
@@ -107,26 +114,26 @@ async def _git_lfs_check(self) -> None:
self._uuid = uuid.uuid4().hex
with tempfile.TemporaryDirectory() as working_dir:
self._working_dir = Path(working_dir)
- with self.timings.start("create origin repo"):
+ with capturing_start_span(op="create origin repo"):
await self._create_origin_repo()
- with self.timings.start("populate origin repo"):
+ with capturing_start_span(op="populate origin repo"):
await self._populate_origin_repo()
- with self.timings.start("create checkout repo"):
+ with capturing_start_span(op="create checkout repo"):
await self._create_checkout_repo()
- with self.timings.start("add LFS-tracked assets"):
+ with capturing_start_span(op="add LFS-tracked assets"):
await self._add_lfs_assets()
git = self._git(repo=Path(self._working_dir / "checkout"))
- with self.timings.start("add git credentials"):
+ with capturing_start_span(op="add git credentials"):
await self._add_credentials(git)
- with self.timings.start("push LFS-tracked assets"):
+ with capturing_start_span(op="push LFS-tracked assets"):
await git.push("origin", "main")
- with self.timings.start("remove git credentials"):
+ with capturing_start_span(op="remove git credentials"):
Path(self._working_dir / ".git_credentials").unlink()
- with self.timings.start("verify origin contents"):
+ with capturing_start_span(op="verify origin contents"):
await self._verify_origin_contents()
- with self.timings.start("create clone repo with asset"):
+ with capturing_start_span(op="create clone repo with asset"):
await self._create_clone_repo()
- with self.timings.start("verify asset contents"):
+ with capturing_start_span(op="verify asset contents"):
await self._verify_asset_contents()
async def _create_origin_repo(self) -> None:
@@ -179,9 +186,9 @@ async def _populate_origin_repo(self) -> None:
async def _add_lfs_assets(self) -> None:
checkout_path = Path(self._working_dir / "checkout")
git = self._git(repo=checkout_path)
- with self.timings.start("install git lfs to checkout repo"):
+ with capturing_start_span(op="install git lfs to checkout repo"):
await self._install_git_lfs(git)
- with self.timings.start("add lfs data to checkout repo"):
+ with capturing_start_span(op="add lfs data to checkout repo"):
await self._add_git_lfs_data(git)
asset_path = Path(checkout_path / "assets")
asset_path.mkdir()
@@ -208,7 +215,7 @@ async def _install_git_lfs(self, git: Git, scope: str = "--local") -> None:
async def _add_git_lfs_data(self, git: Git) -> None:
if git.repo is None:
raise ValueError("Git client repository cannot be 'None'")
- with self.timings.start("git attribute installation"):
+ with capturing_start_span(op="git attribute installation"):
shutil.copyfile(
Path(self._package_data / "gitattributes"),
Path(git.repo / ".gitattributes"),
diff --git a/src/mobu/services/business/notebookrunner.py b/src/mobu/services/business/notebookrunner.py
index dadf7559..c905f311 100644
--- a/src/mobu/services/business/notebookrunner.py
+++ b/src/mobu/services/business/notebookrunner.py
@@ -6,26 +6,36 @@
from __future__ import annotations
+import contextlib
import json
import random
import shutil
-from collections.abc import AsyncIterator
+from collections.abc import AsyncIterator, Iterator
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
+import sentry_sdk
import yaml
from httpx import AsyncClient
from rubin.nublado.client import JupyterLabSession
+from rubin.nublado.client.exceptions import CodeExecutionError
from rubin.nublado.client.models import CodeContext
+from safir.sentry import duration
+from sentry_sdk import set_context, set_tag
+from sentry_sdk.tracing import Span, Transaction
from structlog.stdlib import BoundLogger
from ...constants import GITHUB_REPO_CONFIG_PATH
from ...dependencies.config import config_dependency
from ...events import Events, NotebookCellExecution, NotebookExecution
-from ...exceptions import NotebookRepositoryError, RepositoryConfigError
+from ...exceptions import (
+ NotebookCellExecutionError,
+ NotebookRepositoryError,
+ RepositoryConfigError,
+)
from ...models.business.notebookrunner import (
ListNotebookRunnerOptions,
NotebookFilterResults,
@@ -35,6 +45,7 @@
)
from ...models.repo import RepoConfig
from ...models.user import AuthenticatedUser
+from ...sentry import capturing_start_span, start_transaction
from ...services.business.base import CommonEventAttrs
from ...storage.git import Git
from .nublado import NubladoBusiness
@@ -42,7 +53,9 @@
__all__ = ["NotebookRunner"]
-class CommonNotebookEventAttrs(CommonEventAttrs):
+class _CommonNotebookEventAttrs(CommonEventAttrs):
+ """Common notebook event attributes."""
+
notebook: str
repo: str
repo_ref: str
@@ -103,14 +116,6 @@ def __init__(
case ListNotebookRunnerOptions(notebooks_to_run=notebooks_to_run):
self._notebooks_to_run = notebooks_to_run
- def annotations(self, cell_id: str | None = None) -> dict[str, str]:
- result = super().annotations()
- if self._notebook:
- result["notebook"] = self._notebook.name
- if cell_id:
- result["cell"] = cell_id
- return result
-
async def startup(self) -> None:
await self.initialize()
await super().startup()
@@ -132,6 +137,15 @@ async def initialize(self) -> None:
await self.clone_repo()
repo_config_path = self._repo_dir / GITHUB_REPO_CONFIG_PATH
+ set_context(
+ "repo_info",
+ {
+ "repo_url": self.options.repo_url,
+ "repo_ref": self.options.repo_ref,
+ "repo_hash": self._repo_hash,
+ "repo_config_file": GITHUB_REPO_CONFIG_PATH,
+ },
+ )
if repo_config_path.exists():
try:
repo_config = RepoConfig.model_validate(
@@ -139,11 +153,7 @@ async def initialize(self) -> None:
)
except Exception as err:
raise RepositoryConfigError(
- err=err,
- user=self.user.username,
- config_file=GITHUB_REPO_CONFIG_PATH,
- repo_url=self.options.repo_url,
- repo_ref=self.options.repo_ref,
+ f"Error parsing config file: {GITHUB_REPO_CONFIG_PATH}"
) from err
else:
repo_config = RepoConfig()
@@ -151,6 +161,9 @@ async def initialize(self) -> None:
exclude_dirs = repo_config.exclude_dirs
self._exclude_paths = {self._repo_dir / path for path in exclude_dirs}
self._notebooks = self.find_notebooks()
+ set_context(
+ "notebook_filter_info", self._notebooks.model_dump(mode="json")
+ )
self.logger.info("Repository cloned and ready")
async def shutdown(self) -> None:
@@ -166,7 +179,7 @@ async def refresh(self) -> None:
async def clone_repo(self) -> None:
url = self.options.repo_url
ref = self.options.repo_ref
- with self.timings.start("clone_repo"):
+ with capturing_start_span(op="clone_repo"):
self._git.repo = self._repo_dir
await self._git.clone(url, str(self._repo_dir))
await self._git.checkout(ref)
@@ -196,7 +209,7 @@ def missing_services(self, notebook: Path) -> bool:
return False
def find_notebooks(self) -> NotebookFilterResults:
- with self.timings.start("find_notebooks"):
+ with capturing_start_span(op="find_notebooks"):
if self._repo_dir is None:
raise NotebookRepositoryError(
"Repository directory must be set", self.user.username
@@ -260,9 +273,7 @@ def next_notebook(self) -> Path:
def read_notebook_metadata(self, notebook: Path) -> NotebookMetadata:
"""Extract mobu-specific metadata from a notebook."""
- with self.timings.start(
- "read_notebook_metadata", {"notebook": notebook.name}
- ):
+ with capturing_start_span(op="read_notebook_metadata"):
try:
notebook_text = notebook.read_text()
notebook_json = json.loads(notebook_text)
@@ -273,7 +284,7 @@ def read_notebook_metadata(self, notebook: Path) -> NotebookMetadata:
raise NotebookRepositoryError(msg, self.user.username) from e
def read_notebook(self, notebook: Path) -> list[dict[str, Any]]:
- with self.timings.start("read_notebook", {"notebook": notebook.name}):
+ with capturing_start_span(op="read_notebook"):
try:
notebook_text = notebook.read_text()
cells = json.loads(notebook_text)["cells"]
@@ -314,45 +325,66 @@ async def execute_code(self, session: JupyterLabSession) -> None:
if self.refreshing:
await self.refresh()
return
+ await self.execute_notebook(session, count, num_executions)
- self._notebook = self.next_notebook()
-
- iteration = f"{count + 1}/{num_executions}"
- msg = f"Notebook {self._notebook.name} iteration {iteration}"
- self.logger.info(msg)
-
- with self.timings.start(
- "execute_notebook", self.annotations(self._notebook.name)
- ) as sw:
- try:
- for cell in self.read_notebook(self._notebook):
- code = "".join(cell["source"])
- cell_id = cell.get("id") or cell["_index"]
- ctx = CodeContext(
- notebook=self._notebook.name,
- path=str(self._notebook),
- cell=cell_id,
- cell_number=f"#{cell['_index']}",
- cell_source=code,
- )
- await self.execute_cell(session, code, cell_id, ctx)
- if not await self.execution_idle():
- break
- except:
- await self._publish_notebook_event(
- duration=sw.elapsed, success=False
- )
- raise
-
- self.logger.info(f"Success running notebook {self._notebook.name}")
- await self._publish_notebook_event(
- duration=sw.elapsed, success=True
- )
- if not self._notebook_paths:
- self.logger.info("Done with this cycle of notebooks")
if self.stopping:
break
+ @contextlib.contextmanager
+ def trace_notebook(
+ self, notebook: str, iteration: str
+ ) -> Iterator[Transaction | Span]:
+ """Set up tracing context for executing a notebook."""
+ notebook_info = {"notebook": notebook, "iteration": iteration}
+ with start_transaction(
+ name=f"{self.name} - Execute notebook",
+ op="mobu.notebookrunner.execute_notebook",
+ ) as span:
+ set_tag("notebook", notebook)
+ set_context("notebook_info", notebook_info)
+ yield span
+
+ async def execute_notebook(
+ self, session: JupyterLabSession, count: int, num_executions: int
+ ) -> None:
+ self._notebook = self.next_notebook()
+ relative_notebook = str(
+ self._notebook.relative_to(self._repo_dir or "/")
+ )
+ iteration = f"{count + 1}/{num_executions}"
+ msg = f"Notebook {self._notebook.name} iteration {iteration}"
+ self.logger.info(msg)
+
+ with self.trace_notebook(
+ notebook=relative_notebook, iteration=iteration
+ ) as span:
+ try:
+ for cell in self.read_notebook(self._notebook):
+ code = "".join(cell["source"])
+ cell_id = cell.get("id") or cell["_index"]
+ ctx = CodeContext(
+ notebook=relative_notebook,
+ path=str(self._notebook),
+ cell=cell_id,
+ cell_number=f"#{cell['_index']}",
+ cell_source=code,
+ )
+ await self.execute_cell(session, code, cell_id, ctx)
+ if not await self.execution_idle():
+ break
+ except:
+ await self._publish_notebook_event(
+ duration=duration(span), success=False
+ )
+ raise
+
+ self.logger.info(f"Success running notebook {self._notebook.name}")
+ await self._publish_notebook_event(
+ duration=duration(span), success=True
+ )
+ if not self._notebook_paths:
+ self.logger.info("Done with this cycle of notebooks")
+
async def _publish_notebook_event(
self, duration: timedelta, *, success: bool
) -> None:
@@ -376,7 +408,7 @@ async def _publish_cell_event(
)
)
- def common_notebook_event_attrs(self) -> CommonNotebookEventAttrs:
+ def common_notebook_event_attrs(self) -> _CommonNotebookEventAttrs:
"""Return notebook event attrs with the other common attrs."""
notebook = self._notebook.name if self._notebook else "unknown"
return {
@@ -392,28 +424,49 @@ async def execute_cell(
session: JupyterLabSession,
code: str,
cell_id: str,
- context: CodeContext | None = None,
+ context: CodeContext,
) -> None:
if not self._notebook:
raise RuntimeError("Executing a cell without a notebook")
self.logger.info(f"Executing cell {cell_id}:\n{code}\n")
- with self.timings.start(
- "execute_cell", self.annotations(cell_id)
- ) as sw:
+ set_tag("cell", cell_id)
+ cell_info = {
+ "code": code,
+ "cell_id": cell_id,
+ "cell_number": context.cell_number,
+ }
+ set_context("cell_info", cell_info)
+ with capturing_start_span(op="execute_cell") as span:
+ # The scope context only appears on the transaction, and not on
+ # individual spans. Since the cell info will be different for
+ # different spans, we need to set this data directly on the span.
+ # We have to set it in the context too so that it shows up in any
+ # exception events. Unfortuantely, span data is not included in
+ # exception events.
+ span.set_data("cell_info", cell_info)
self._running_code = code
try:
reply = await session.run_python(code, context=context)
- except:
+ except Exception as e:
+ if isinstance(e, CodeExecutionError) and e.error:
+ sentry_sdk.get_current_scope().add_attachment(
+ filename="nublado_error.txt",
+ bytes=self.remove_ansi_escapes(e.error).encode(),
+ )
await self._publish_cell_event(
cell_id=cell_id,
- duration=sw.elapsed,
+ duration=duration(span),
success=False,
)
- raise
+
+ notebook = getattr(context, "notebook", " NotebookRunnerData:
diff --git a/src/mobu/services/business/nublado.py b/src/mobu/services/business/nublado.py
index cc12a864..e73038c9 100644
--- a/src/mobu/services/business/nublado.py
+++ b/src/mobu/services/business/nublado.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import re
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
@@ -10,21 +11,21 @@
from random import SystemRandom
from typing import Generic, TypeVar
-import rubin.nublado.client.exceptions as ne
+import sentry_sdk
from httpx import AsyncClient
from rubin.nublado.client import JupyterLabSession, NubladoClient
from safir.datetime import current_datetime, format_datetime_for_logging
-from safir.slack.blockkit import SlackException
+from safir.sentry import duration
+from sentry_sdk import set_tag
+from sentry_sdk.tracing import Span
from structlog.stdlib import BoundLogger
from ...dependencies.config import config_dependency
from ...events import Events, NubladoDeleteLab, NubladoSpawnLab
from ...exceptions import (
- CodeExecutionError,
- JupyterProtocolError,
+ JupyterDeleteTimeoutError,
JupyterSpawnError,
- JupyterTimeoutError,
- JupyterWebError,
+ JupyterSpawnTimeoutError,
)
from ...models.business.nublado import (
NubladoBusinessData,
@@ -32,13 +33,15 @@
RunningImage,
)
from ...models.user import AuthenticatedUser
-from ...services.timings import Stopwatch
+from ...sentry import capturing_start_span, start_transaction
from .base import Business
T = TypeVar("T", bound="NubladoBusinessOptions")
__all__ = ["NubladoBusiness", "ProgressLogMessage"]
+_ANSI_REGEX = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]")
+"""Regex that matches ANSI escape sequences."""
_CHDIR_TEMPLATE = 'import os; os.chdir("{wd}")'
"""Template to construct the code to run to set the working directory."""
@@ -139,6 +142,10 @@ def __init__(
self._node: str | None = None
self._random = SystemRandom()
+ # We want multiple transactions for each call to execute (one for each
+ # notebook in a NotebookRunner business, for example)
+ self.execute_transaction = False
+
@abstractmethod
async def execute_code(self, session: JupyterLabSession) -> None:
"""Execute some code inside the Jupyter lab.
@@ -157,27 +164,12 @@ async def execute_code(self, session: JupyterLabSession) -> None:
async def close(self) -> None:
await self._client.close()
- def annotations(self) -> dict[str, str]:
- """Timer annotations to use.
-
- Subclasses should override this to add more annotations based on
- current business state. They should call ``super().annotations()``
- and then add things to the resulting dictionary.
- """
- result = {}
- if self._image:
- result["image"] = (
- self._image.description
- or self._image.reference
- or ""
- )
- if self._node:
- result["node"] = self._node
- return result
-
async def startup(self) -> None:
+ # We need to start a span around this transaction becaues if we don't,
+ # the nested transaction shows up as "No instrumentation" in the
+ # enclosing transaction in the Sentry UI.
if self.options.jitter:
- with self.timings.start("pre_login_delay"):
+ with capturing_start_span(op="pre_login_delay"):
max_delay = self.options.jitter.total_seconds()
delay = self._random.uniform(0, max_delay)
if not await self.pause(timedelta(seconds=delay)):
@@ -186,36 +178,31 @@ async def startup(self) -> None:
if not await self._client.is_lab_stopped():
try:
await self.delete_lab()
- except JupyterTimeoutError:
+ except JupyterDeleteTimeoutError:
msg = "Unable to delete pre-existing lab, continuing anyway"
self.logger.warning(msg)
async def execute(self) -> None:
- try:
- await self._execute()
- except Exception as exc:
- monkey = getattr(exc, "monkey", None)
- event = getattr(exc, "event", "execute_code")
- if isinstance(exc, ne.CodeExecutionError):
- raise CodeExecutionError.from_client_exception(
- exc,
- monkey=monkey,
- event=event,
- annotations=self.annotations(),
- ) from exc
- raise
-
- async def _execute(self) -> None:
- if self.options.delete_lab or await self._client.is_lab_stopped():
- self._image = None
- if not await self.spawn_lab():
- return
- await self.lab_login()
+ with start_transaction(
+ name=f"{self.name} - pre execute code",
+ op=f"mobu.{self.name}.pre_execute_code",
+ ):
+ if self.options.delete_lab or await self._client.is_lab_stopped():
+ self._image = None
+ set_tag("image_description", None)
+ set_tag("image_reference", None)
+ if not await self.spawn_lab():
+ return
+ await self.lab_login()
async with self.open_session() as session:
await self.execute_code(session)
- if self.options.delete_lab:
- await self.hub_login()
- await self.delete_lab()
+ with start_transaction(
+ name=f"{self.name} - post execute code",
+ op=f"mobu.{self.name}.post_execute_code",
+ ):
+ if self.options.delete_lab:
+ await self.hub_login()
+ await self.delete_lab()
async def execution_idle(self) -> bool:
"""Pause between each unit of work execution.
@@ -224,7 +211,7 @@ async def execution_idle(self) -> bool:
subclasses in `execute_code` in between each block of code that is
executed.
"""
- with self.timings.start("execution_idle"):
+ with capturing_start_span(op="execution_idle"):
return await self.pause(self.options.execution_idle_time)
async def shutdown(self) -> None:
@@ -234,7 +221,7 @@ async def shutdown(self) -> None:
async def idle(self) -> None:
if self.options.jitter:
self.logger.info("Idling...")
- with self.timings.start("idle"):
+ with capturing_start_span(op="idle"):
extra_delay = self._random.uniform(0, self.options.jitter)
await self.pause(self.options.idle_time + extra_delay)
else:
@@ -242,58 +229,34 @@ async def idle(self) -> None:
async def hub_login(self) -> None:
self.logger.info("Logging in to hub")
- with self.timings.start("hub_login", self.annotations()) as sw:
- try:
- await self._client.auth_to_hub()
- except ne.JupyterProtocolError as exc:
- raise JupyterProtocolError.from_client_exception(
- exc,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from exc
- except ne.JupyterWebError as exc:
- raise JupyterWebError.from_client_exception(
- exc,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from exc
+ with capturing_start_span(op="hub_login"):
+ await self._client.auth_to_hub()
async def spawn_lab(self) -> bool:
- with self.timings.start("spawn_lab", self.annotations()) as sw:
+ with capturing_start_span(op="spawn_lab") as span:
try:
- result = await self._spawn_lab(sw)
+ result = await self._spawn_lab(span)
except:
await self.events.nublado_spawn_lab.publish(
NubladoSpawnLab(
success=False,
- duration=sw.elapsed,
+ duration=duration(span),
**self.common_event_attrs(),
)
)
raise
await self.events.nublado_spawn_lab.publish(
NubladoSpawnLab(
- success=True, duration=sw.elapsed, **self.common_event_attrs()
+ success=True,
+ duration=duration(span),
+ **self.common_event_attrs(),
)
)
return result
- async def _spawn_lab(self, sw: Stopwatch) -> bool: # noqa: C901
- # Ruff says this method is too complex, and it is, but it will become
- # less complex when we refactor and potentially Sentry-fy the slack
- # error reporting
+ async def _spawn_lab(self, span: Span) -> bool:
timeout = self.options.spawn_timeout
- try:
- await self._client.spawn_lab(self.options.image)
- except ne.JupyterWebError as exc:
- raise JupyterWebError.from_client_exception(
- exc,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from exc
+ await self._client.spawn_lab(self.options.image)
# Pause before using the progress API, since otherwise it may not
# have attached to the spawner and will not return a full stream
@@ -310,69 +273,34 @@ async def _spawn_lab(self, sw: Stopwatch) -> bool: # noqa: C901
log_messages.append(ProgressLogMessage(message.message))
if message.ready:
return True
- except TimeoutError:
+ except:
log = "\n".join([str(m) for m in log_messages])
- raise JupyterSpawnError(
- log,
- self.user.username,
- event=sw.event,
- started_at=sw.start_time,
- ) from None
- except ne.JupyterWebError as exc:
- raise JupyterWebError.from_client_exception(
- exc,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from exc
- except SlackException:
+ sentry_sdk.get_current_scope().add_attachment(
+ filename="spawn_log.txt",
+ bytes=self.remove_ansi_escapes(log).encode(),
+ )
raise
- except Exception as e:
- log = "\n".join([str(m) for m in log_messages])
- user = self.user.username
- raise JupyterSpawnError.from_exception(
- log,
- e,
- user,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from e
# We only fall through if the spawn failed, timed out, or if we're
# stopping the business.
if self.stopping:
return False
log = "\n".join([str(m) for m in log_messages])
- if sw.elapsed > timeout:
- elapsed_seconds = round(sw.elapsed.total_seconds())
- msg = f"Lab did not spawn after {elapsed_seconds}s"
- raise JupyterTimeoutError(
- msg,
- self.user.username,
- log,
- event=sw.event,
- started_at=sw.start_time,
- )
- raise JupyterSpawnError(
- log,
- self.user.username,
- event=sw.event,
- started_at=sw.start_time,
+ sentry_sdk.get_current_scope().add_attachment(
+ filename="spawn_log.txt",
+ bytes=self.remove_ansi_escapes(log).encode(),
)
+ spawn_duration = duration(span)
+ if spawn_duration > timeout:
+ elapsed_seconds = round(spawn_duration.total_seconds())
+ msg = f"Lab did not spawn after {elapsed_seconds}s"
+ raise JupyterSpawnTimeoutError(msg)
+ raise JupyterSpawnError
async def lab_login(self) -> None:
self.logger.info("Logging in to lab")
- with self.timings.start("lab_login", self.annotations()) as sw:
- try:
- await self._client.auth_to_lab()
- except ne.JupyterProtocolError as exc:
- raise JupyterProtocolError.from_client_exception(
- exc,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from exc
+ with capturing_start_span(op="lab_login"):
+ await self._client.auth_to_lab()
@asynccontextmanager
async def open_session(
@@ -380,19 +308,20 @@ async def open_session(
) -> AsyncIterator[JupyterLabSession]:
self.logger.info("Creating lab session")
opts = {"max_websocket_size": self.options.max_websocket_message_size}
- stopwatch = self.timings.start("create_session", self.annotations())
+ create_session_cm = capturing_start_span(op="create_session")
+ create_session_cm.__enter__()
async with self._client.open_lab_session(notebook, **opts) as session:
- stopwatch.stop()
- with self.timings.start("execute_setup", self.annotations()):
+ create_session_cm.__exit__(None, None, None)
+ with capturing_start_span(op="execute_setup"):
await self.setup_session(session)
yield session
await self.lab_login()
self.logger.info("Deleting lab session")
- stopwatch = self.timings.start(
- "delete_session", self.annotations()
- )
- stopwatch.stop()
+ delete_session_cm = capturing_start_span(op="delete_session")
+ delete_session_cm.__enter__()
+ delete_session_cm.__exit__(None, None, None)
self._node = None
+ set_tag("node", None)
async def setup_session(self, session: JupyterLabSession) -> None:
image_data = await session.run_python(_GET_IMAGE)
@@ -409,8 +338,11 @@ async def setup_session(self, session: JupyterLabSession) -> None:
reference=reference.strip() if reference else None,
description=description.strip() if description else None,
)
+ set_tag("image_description", self._image.description)
+ set_tag("image_reference", self._image.reference)
if self.options.get_node:
self._node = await session.run_python(_GET_NODE)
+ set_tag("node", self._node)
self.logger.info(f"Running on node {self._node}")
if self.options.working_directory:
path = self.options.working_directory
@@ -419,14 +351,14 @@ async def setup_session(self, session: JupyterLabSession) -> None:
await session.run_python(code)
async def delete_lab(self) -> None:
- with self.timings.start("delete_lab", self.annotations()) as sw:
+ with capturing_start_span(op="delete_lab") as span:
try:
- result = await self._delete_lab(sw)
+ result = await self._delete_lab()
except:
await self.events.nublado_delete_lab.publish(
NubladoDeleteLab(
success=False,
- duration=sw.elapsed,
+ duration=duration(span),
**self.common_event_attrs(),
)
)
@@ -437,19 +369,14 @@ async def delete_lab(self) -> None:
await self.events.nublado_delete_lab.publish(
NubladoDeleteLab(
success=True,
- duration=sw.elapsed,
+ duration=duration(span),
**self.common_event_attrs(),
)
)
- async def _delete_lab(self, sw: Stopwatch) -> bool:
+ async def _delete_lab(self) -> bool:
"""Delete a lab.
- Parameters
- ----------
- sw
- A Stopwatch to time the lab deletion
-
Returns
-------
bool
@@ -457,15 +384,7 @@ async def _delete_lab(self, sw: Stopwatch) -> bool:
didn't wait to find out if the lab was successfully deleted.
"""
self.logger.info("Deleting lab")
- try:
- await self._client.stop_lab()
- except ne.JupyterWebError as exc:
- raise JupyterWebError.from_client_exception(
- exc,
- event=sw.event,
- annotations=sw.annotations,
- started_at=sw.start_time,
- ) from exc
+ await self._client.stop_lab()
if self.stopping:
return False
@@ -479,14 +398,7 @@ async def _delete_lab(self, sw: Stopwatch) -> bool:
if elapsed > self.options.delete_timeout:
if not await self._client.is_lab_stopped(log_running=True):
msg = f"Lab not deleted after {elapsed_seconds}s"
- jte = JupyterTimeoutError(
- msg,
- self.user.username,
- started_at=start,
- event=sw.event,
- )
- jte.annotations["image"] = self.options.image.description
- raise jte
+ raise JupyterDeleteTimeoutError(msg)
msg = f"Waiting for lab deletion ({elapsed_seconds}s elapsed)"
self.logger.info(msg)
if not await self.pause(timedelta(seconds=2)):
@@ -494,9 +406,33 @@ async def _delete_lab(self, sw: Stopwatch) -> bool:
self.logger.info("Lab successfully deleted")
self._image = None
+ set_tag("image_description", None)
+ set_tag("image_reference", None)
return True
def dump(self) -> NubladoBusinessData:
return NubladoBusinessData(
image=self._image, **super().dump().model_dump()
)
+
+ def remove_ansi_escapes(self, string: str) -> str:
+ """Remove ANSI escape sequences from a string.
+
+ Jupyter labs like to format error messages with lots of ANSI
+ escape sequences, and Slack doesn't like that in messages (nor do
+ humans want to see them). Strip them out.
+
+ Based on `this StackOverflow answer
+ `__.
+
+ Parameters
+ ----------
+ string
+ String to strip ANSI escapes from.
+
+ Returns
+ -------
+ str
+ Sanitized string.
+ """
+ return _ANSI_REGEX.sub("", string)
diff --git a/src/mobu/services/business/nubladopythonloop.py b/src/mobu/services/business/nubladopythonloop.py
index 0bb43a65..a9853f46 100644
--- a/src/mobu/services/business/nubladopythonloop.py
+++ b/src/mobu/services/business/nubladopythonloop.py
@@ -8,13 +8,17 @@
from datetime import timedelta
+import sentry_sdk
from httpx import AsyncClient
from rubin.nublado.client import JupyterLabSession
+from rubin.nublado.client.exceptions import CodeExecutionError
+from safir.sentry import duration
from structlog.stdlib import BoundLogger
from ...events import Events, NubladoPythonExecution
from ...models.business.nubladopythonloop import NubladoPythonLoopOptions
from ...models.user import AuthenticatedUser
+from ...sentry import start_transaction
from .nublado import NubladoBusiness
__all__ = ["NubladoPythonLoop"]
@@ -58,15 +62,24 @@ def __init__(
async def execute_code(self, session: JupyterLabSession) -> None:
code = self.options.code
+ sentry_sdk.set_context("code_info", {"code": code})
for _count in range(self.options.max_executions):
- with self.timings.start("execute_code", self.annotations()) as sw:
+ with start_transaction(
+ name=f"{self.name} - Execute Python",
+ op="mobu.notebookrunner.execute_python",
+ ) as span:
try:
reply = await session.run_python(code)
- except:
+ except Exception as e:
+ if isinstance(e, CodeExecutionError) and e.error:
+ sentry_sdk.get_current_scope().add_attachment(
+ filename="nublado_error.txt",
+ bytes=self.remove_ansi_escapes(e.error).encode(),
+ )
await self._publish_failure(code=code)
raise
self.logger.info(f"{code} -> {reply}")
- await self._publish_success(code=code, duration=sw.elapsed)
+ await self._publish_success(code=code, duration=duration(span))
if not await self.execution_idle():
break
diff --git a/src/mobu/services/business/tap.py b/src/mobu/services/business/tap.py
index 3d51953f..60687fd2 100644
--- a/src/mobu/services/business/tap.py
+++ b/src/mobu/services/business/tap.py
@@ -10,13 +10,16 @@
import pyvo
import requests
from httpx import AsyncClient
+from safir.sentry import duration
+from sentry_sdk import set_context
from structlog.stdlib import BoundLogger
from ...dependencies.config import config_dependency
from ...events import Events, TapQuery
-from ...exceptions import CodeExecutionError, TAPClientError
+from ...exceptions import TAPClientError
from ...models.business.tap import TAPBusinessData, TAPBusinessOptions
from ...models.user import AuthenticatedUser
+from ...sentry import capturing_start_span, start_transaction
from .base import Business
T = TypeVar("T", bound="TAPBusinessOptions")
@@ -69,8 +72,7 @@ def __init__(
self._pool = ThreadPoolExecutor(max_workers=1)
async def startup(self) -> None:
- with self.timings.start("make_client"):
- self._client = self._make_client(self.user.token)
+ self._client = self._make_client(self.user.token)
@abstractmethod
def get_next_query(self) -> str:
@@ -83,40 +85,39 @@ def get_next_query(self) -> str:
"""
async def execute(self) -> None:
- query = self.get_next_query()
- with self.timings.start("execute_query", {"query": query}) as sw:
- self._running_query = query
-
- success = False
- try:
- if self.options.sync:
- await self.run_sync_query(query)
- else:
- await self.run_async_query(query)
- success = True
- except Exception as e:
- raise CodeExecutionError(
- user=self.user.username,
- code=query,
- code_type="TAP query",
- event="execute_query",
- started_at=sw.start_time,
- error=f"{type(e).__name__}: {e!s}",
- ) from e
- finally:
- await self.events.tap_query.publish(
- payload=TapQuery(
- success=success,
- duration=sw.elapsed,
- sync=self.options.sync,
- **self.common_event_attrs(),
- )
+ with start_transaction(
+ name=f"{self.name} - execute",
+ op="mobu.tap.execute",
+ ):
+ query = self.get_next_query()
+ with capturing_start_span(op="mobu.tap.execute_query") as span:
+ set_context(
+ "query_info",
+ {"query": query, "started_at": span.start_timestamp},
)
+ self._running_query = query
+
+ success = False
+ try:
+ if self.options.sync:
+ await self.run_sync_query(query)
+ else:
+ await self.run_async_query(query)
+ success = True
+ finally:
+ await self.events.tap_query.publish(
+ payload=TapQuery(
+ success=success,
+ duration=duration(span),
+ sync=self.options.sync,
+ **self.common_event_attrs(),
+ )
+ )
- self._running_query = None
- elapsed = sw.elapsed.total_seconds()
+ self._running_query = None
+ elapsed = duration(span).total_seconds()
- self.logger.info(f"Query finished after {elapsed} seconds")
+ self.logger.info(f"Query finished after {elapsed} seconds")
async def run_async_query(self, query: str) -> None:
"""Run the query asynchronously.
@@ -169,19 +170,26 @@ def _make_client(self, token: str) -> pyvo.dal.TAPService:
pyvo.dal.TAPService
TAP client object.
"""
- config = config_dependency.config
- if not config.environment_url:
- raise RuntimeError("environment_url not set")
- tap_url = str(config.environment_url).rstrip("/") + "/api/tap"
- try:
- s = requests.Session()
- s.headers["Authorization"] = "Bearer " + token
- auth = pyvo.auth.AuthSession()
- auth.credentials.set("lsst-token", s)
- auth.add_security_method_for_url(tap_url, "lsst-token")
- auth.add_security_method_for_url(tap_url + "/sync", "lsst-token")
- auth.add_security_method_for_url(tap_url + "/async", "lsst-token")
- auth.add_security_method_for_url(tap_url + "/tables", "lsst-token")
- return pyvo.dal.TAPService(tap_url, auth)
- except Exception as e:
- raise TAPClientError(e, user=self.user.username) from e
+ with capturing_start_span(op="make_client"):
+ config = config_dependency.config
+ if not config.environment_url:
+ raise RuntimeError("environment_url not set")
+ tap_url = str(config.environment_url).rstrip("/") + "/api/tap"
+ try:
+ s = requests.Session()
+ s.headers["Authorization"] = "Bearer " + token
+ auth = pyvo.auth.AuthSession()
+ auth.credentials.set("lsst-token", s)
+ auth.add_security_method_for_url(tap_url, "lsst-token")
+ auth.add_security_method_for_url(
+ tap_url + "/sync", "lsst-token"
+ )
+ auth.add_security_method_for_url(
+ tap_url + "/async", "lsst-token"
+ )
+ auth.add_security_method_for_url(
+ tap_url + "/tables", "lsst-token"
+ )
+ return pyvo.dal.TAPService(tap_url, auth)
+ except Exception as e:
+ raise TAPClientError(e) from e
diff --git a/src/mobu/services/monkey.py b/src/mobu/services/monkey.py
index 5bd70cd2..70136287 100644
--- a/src/mobu/services/monkey.py
+++ b/src/mobu/services/monkey.py
@@ -6,18 +6,16 @@
import sys
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
+import sentry_sdk
import structlog
from aiojobs import Job, Scheduler
from httpx import AsyncClient
-from safir.datetime import current_datetime, format_datetime_for_logging
from safir.logging import Profile
-from safir.slack.blockkit import SlackException, SlackMessage, SlackTextField
from safir.slack.webhook import SlackWebhookClient
from structlog.stdlib import BoundLogger
from ..dependencies.config import config_dependency
from ..events import Events
-from ..exceptions import MobuMixin
from ..models.business.base import BusinessConfig
from ..models.business.empty import EmptyLoopConfig
from ..models.business.gitlfs import GitLFSConfig
@@ -159,7 +157,7 @@ def __init__(
)
async def alert(self, exc: Exception) -> None:
- """Send an alert to Slack.
+ """Send an exception to Sentry.
Parameters
----------
@@ -170,36 +168,7 @@ async def alert(self, exc: Exception) -> None:
state = self._state.name
self._logger.info(f"Not sending alert because state is {state}")
return
- if not self._slack:
- self._logger.info("Alert hook isn't set, so not sending to Slack")
- return
- monkey = f"{self._flock}/{self._name}" if self._flock else self._name
- if isinstance(exc, MobuMixin):
- # Add the monkey info if it is not already set.
- if not exc.monkey:
- exc.monkey = monkey
- if isinstance(exc, SlackException):
- # Avoid post_exception here since it adds the application name,
- # but mobu (unusually) uses a dedicated web hook and therefore
- # doesn't need to label its alerts.
- await self._slack.post(exc.to_slack())
- else:
- now = current_datetime(microseconds=True)
- date = format_datetime_for_logging(now)
- name = type(exc).__name__
- error = f"{name}: {exc!s}"
- message = SlackMessage(
- message=f"Unexpected exception {error}",
- fields=[
- SlackTextField(heading="Exception type", text=name),
- SlackTextField(heading="Failed at", text=date),
- SlackTextField(heading="Monkey", text=monkey),
- SlackTextField(heading="User", text=self._user.username),
- ],
- )
- await self._slack.post(message)
-
- self._global_logger.info("Sent alert to Slack")
+ sentry_sdk.capture_exception(exc)
def logfile(self) -> str:
"""Get the log file for a monkey's log."""
@@ -216,15 +185,18 @@ async def run_once(self) -> str | None:
"""
self._state = MonkeyState.RUNNING
error = None
- try:
- await self.business.run_once()
- self._state = MonkeyState.FINISHED
- except Exception as e:
- msg = "Exception thrown while doing monkey business"
- self._logger.exception(msg)
- error = str(e)
- self._state = MonkeyState.ERROR
- return error
+ with sentry_sdk.isolation_scope():
+ sentry_sdk.set_user({"username": self._user.username})
+ sentry_sdk.set_tag("business", self.business.name)
+ try:
+ await self.business.run_once()
+ self._state = MonkeyState.FINISHED
+ except Exception as e:
+ msg = "Exception thrown while doing monkey business"
+ self._logger.exception(msg)
+ error = str(e)
+ self._state = MonkeyState.ERROR
+ return error
async def start(self, scheduler: Scheduler) -> None:
"""Start the monkey."""
@@ -240,35 +212,32 @@ async def _runner(self) -> None:
run = True
while run:
- try:
- self._state = MonkeyState.RUNNING
- await self.business.run()
- run = False
- except Exception as e:
- msg = "Exception thrown while doing monkey business"
- if self._flock:
- monkey = f"{self._flock}/{self._name}"
- else:
- monkey = self._name
- if isinstance(e, MobuMixin):
- e.monkey = monkey
-
- await self.alert(e)
- self._logger.exception(msg)
-
- run = self._restart and self._state == MonkeyState.RUNNING
- if run:
- self._state = MonkeyState.ERROR
- await self.business.error_idle()
- if self._state == MonkeyState.STOPPING:
- run = False
- else:
- self._state = MonkeyState.STOPPING
- msg = "Shutting down monkey due to error"
- self._global_logger.warning(msg)
-
- await self.business.close()
- self.state = MonkeyState.FINISHED
+ with sentry_sdk.isolation_scope():
+ sentry_sdk.set_tag("flock", self._flock)
+ sentry_sdk.set_user({"username": self._user.username})
+ sentry_sdk.set_tag("business", self.business.name)
+ try:
+ self._state = MonkeyState.RUNNING
+ await self.business.run()
+ run = False
+ except Exception as e:
+ msg = "Exception thrown while doing monkey business"
+ await self.alert(e)
+ self._logger.exception(msg)
+
+ run = self._restart and self._state == MonkeyState.RUNNING
+ if run:
+ self._state = MonkeyState.ERROR
+ await self.business.error_idle()
+ if self._state == MonkeyState.STOPPING:
+ run = False
+ else:
+ self._state = MonkeyState.STOPPING
+ msg = "Shutting down monkey due to error"
+ self._global_logger.warning(msg)
+
+ await self.business.close()
+ self.state = MonkeyState.FINISHED
async def stop(self) -> None:
"""Stop the monkey."""
diff --git a/src/mobu/services/timings.py b/src/mobu/services/timings.py
deleted file mode 100644
index e296168b..00000000
--- a/src/mobu/services/timings.py
+++ /dev/null
@@ -1,125 +0,0 @@
-"""Holds timing information for mobu events."""
-
-from __future__ import annotations
-
-from datetime import datetime, timedelta
-from types import TracebackType
-from typing import Literal, Self
-
-from safir.datetime import current_datetime
-
-from ..exceptions import MobuSlackException
-from ..models.timings import StopwatchData
-
-
-class Timings:
- """Holds a collection of timings.
-
- The underlying data structure is a list of `Stopwatch` objects with some
- machinery to start and stop timers.
- """
-
- def __init__(self) -> None:
- self._last: Stopwatch | None = None
- self._stopwatches: list[Stopwatch] = []
-
- def start(
- self, event: str, annotations: dict[str, str] | None = None
- ) -> Stopwatch:
- """Start a stopwatch.
-
- Examples
- --------
- This should normally be used as a context manager:
-
- .. code-block:: python
-
- with timings.start("event", annotation):
- ...
- """
- if not annotations:
- annotations = {}
- stopwatch = Stopwatch(event, annotations, self._last)
- self._stopwatches.append(stopwatch)
- self._last = stopwatch
- return stopwatch
-
- def dump(self) -> list[StopwatchData]:
- """Convert the stored timings to a dictionary."""
- return [s.dump() for s in self._stopwatches]
-
-
-class Stopwatch:
- """Container for time data.
-
- A metric container for time data and its serialization. Normally, this
- should be used as a context manager and it will automatically close the
- timer when the context manager is exited. It can instead be stored in a
- variable and stopped explicitly with ``stop`` in cases where a context
- manager isn't appropriate.
-
- Parameters
- ----------
- event
- The name of the event.
- annotation
- Arbitrary annotations.
- previous
- The previous stopwatch, used to calculate the idle time between
- timed events.
- """
-
- def __init__(
- self,
- event: str,
- annotations: dict[str, str],
- previous: Stopwatch | None = None,
- ) -> None:
- self.event = event
- self.annotations = annotations
- self.start_time = current_datetime(microseconds=True)
- self.stop_time: datetime | None = None
- self.failed = False
- self._previous = previous
-
- def __enter__(self) -> Self:
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> Literal[False]:
- self.stop_time = current_datetime(microseconds=True)
- if exc_val:
- self.failed = True
- if exc_val and isinstance(exc_val, MobuSlackException):
- exc_val.started_at = self.start_time
- exc_val.event = self.event
- exc_val.annotations = self.annotations
- return False
-
- @property
- def elapsed(self) -> timedelta:
- """Return the total time (to the present if not stopped)."""
- if self.stop_time:
- return self.stop_time - self.start_time
- else:
- return current_datetime(microseconds=True) - self.start_time
-
- def dump(self) -> StopwatchData:
- """Convert to a Pydantic model."""
- elapsed = self.stop_time - self.start_time if self.stop_time else None
- return StopwatchData(
- event=self.event,
- annotations=self.annotations,
- start=self.start_time,
- stop=self.stop_time,
- elapsed=elapsed,
- failed=self.failed,
- )
-
- def stop(self) -> None:
- """Explicitly stop the stopwatch, outside of a context manager."""
- self.stop_time = current_datetime(microseconds=True)
diff --git a/src/mobu/status.py b/src/mobu/status.py
index 67a1fee2..51065bd9 100644
--- a/src/mobu/status.py
+++ b/src/mobu/status.py
@@ -30,11 +30,11 @@ async def post_status() -> None:
flock_plural = "flock" if flock_count == 1 else "flocks"
text = (
f"Currently running {flock_count} {flock_plural} against"
- f' {str(config.environment_url).rstrip("/")}:\n'
+ f" {str(config.environment_url).rstrip('/')}:\n"
)
for summary in summaries:
if summary.start_time:
- start_time = f'started {summary.start_time.strftime("%Y-%m-%d")}'
+ start_time = f"started {summary.start_time.strftime('%Y-%m-%d')}"
else:
start_time = "(not started)"
monkey_plural = "monkey" if summary.monkey_count == 1 else "monkeys"
diff --git a/tests/autostart_test.py b/tests/autostart_test.py
index 6bdb8ed0..131c159a 100644
--- a/tests/autostart_test.py
+++ b/tests/autostart_test.py
@@ -38,7 +38,6 @@ async def test_autostart(client: AsyncClient, jupyter: MockJupyter) -> None:
"name": "EmptyLoop",
"refreshing": False,
"success_count": ANY,
- "timings": ANY,
},
"state": ANY,
"user": {
@@ -110,7 +109,6 @@ async def test_autostart(client: AsyncClient, jupyter: MockJupyter) -> None:
"name": "NubladoPythonLoop",
"refreshing": False,
"success_count": ANY,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -132,7 +130,6 @@ async def test_autostart(client: AsyncClient, jupyter: MockJupyter) -> None:
"name": "NubladoPythonLoop",
"refreshing": False,
"success_count": ANY,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
diff --git a/tests/business/gitlfs_test.py b/tests/business/gitlfs_test.py
index ed945680..7ce84deb 100644
--- a/tests/business/gitlfs_test.py
+++ b/tests/business/gitlfs_test.py
@@ -40,7 +40,6 @@ async def test_run(
"name": "GitLFSBusiness",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -95,7 +94,6 @@ async def test_fail(
"name": "GitLFSBusiness",
"refreshing": False,
"success_count": 0,
- "timings": ANY,
},
"state": ANY,
"user": {
diff --git a/tests/business/notebookrunner_test.py b/tests/business/notebookrunner_test.py
index 436ed71a..a8cb6f97 100644
--- a/tests/business/notebookrunner_test.py
+++ b/tests/business/notebookrunner_test.py
@@ -10,11 +10,11 @@
import pytest
import respx
-from anys import AnySearch
+from anys import ANY_AWARE_DATETIME_STR, AnyContains, AnySearch, AnyWithEntries
from httpx import AsyncClient
from rubin.nublado.client.testing import MockJupyter
from safir.metrics import NOT_NONE, MockEventPublisher
-from safir.testing.slack import MockSlackWebhook
+from safir.testing.sentry import Captured
from mobu.events import Events
from mobu.storage.git import Git
@@ -98,7 +98,6 @@ async def test_run(
"notebook": "test-notebook.ipynb",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -212,7 +211,6 @@ async def test_run_debug_log(
"notebook": "test-notebook.ipynb",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -288,7 +286,6 @@ async def test_run_recursive(
"notebook": ANY,
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -416,7 +413,6 @@ async def test_run_required_services(
"notebook": ANY,
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -507,7 +503,6 @@ async def test_run_all_notebooks(
"notebook": "test-notebook-has-services.ipynb",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -675,7 +670,6 @@ async def test_exclude_dirs(
"notebook": ANY,
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -719,7 +713,7 @@ async def test_invalid_repo_config(
client: AsyncClient,
respx_mock: respx.Router,
tmp_path: Path,
- slack: MockSlackWebhook,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
cwd = Path.cwd()
@@ -774,7 +768,6 @@ async def test_invalid_repo_config(
"name": "NotebookRunner",
"refreshing": False,
"success_count": 0,
- "timings": ANY,
},
"name": "bot-mobu-testuser1",
"state": "STOPPING",
@@ -787,92 +780,45 @@ async def test_invalid_repo_config(
finally:
os.chdir(cwd)
- # Make sure we sent a validation error in a Slack notification
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Error parsing config file: mobu.yaml",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nRepositoryConfigError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "*Git Ref*\nmain",
- "verbatim": True,
- },
- },
- ],
- "attachments": [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- }
- ]
- }
- ],
- }
+ # Confirm Sentry errors
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["repo_info"] == {
+ "repo_config_file": "PosixPath('mobu.yaml')",
+ "repo_hash": ANY,
+ "repo_ref": "main",
+ "repo_url": AnySearch("test_invalid_repo_config0/notebooks$"),
+ }
+ assert sentry_error["exception"]["values"] == [
+ AnyWithEntries(
+ {
+ "type": "ValidationError",
+ "value": (AnySearch("2 validation errors for RepoConfig")),
+ }
+ ),
+ AnyWithEntries(
+ {
+ "type": "RepositoryConfigError",
+ "value": ("Error parsing config file: mobu.yaml"),
+ }
+ ),
]
+ assert sentry_error["tags"] == {
+ "business": "NotebookRunner",
+ "flock": "test",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
- repo = slack.messages[0]["blocks"][2]["text"]["text"]
- assert "test_invalid_repo_config0/notebooks" in repo
-
- error = slack.messages[0]["attachments"][0]["blocks"][0]["text"]["text"]
- assert "ValidationError:" in error
- assert "2 validation errors for RepoConfig" in error
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == ("NotebookRunner - startup")
@pytest.mark.asyncio
async def test_alert(
client: AsyncClient,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
tmp_path: Path,
events: Events,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
@@ -926,7 +872,6 @@ async def test_alert(
"refreshing": False,
"running_code": bad_code,
"success_count": 0,
- "timings": ANY,
},
"state": "ERROR",
"user": {
@@ -935,101 +880,63 @@ async def test_alert(
"username": "bot-mobu-testuser1",
},
}
+ # Confirm Sentry errors
+ (sentry_error,) = sentry_items.errors
- # Check that an appropriate error was posted.
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "Error while running `exception.ipynb`"
- " cell `ed399c0a`"
- ),
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nCodeExecutionError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Image*\nRecommended (Weekly 2077_43)",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nexecute_code",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "*Node*\nNode1",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "*Cell*\n`exception.ipynb` cell `ed399c0a` (#1)"
- ),
- "verbatim": True,
- },
- },
- ],
- "attachments": [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- f"*Code executed*\n```\n{bad_code}\n```"
- ),
- "verbatim": True,
- },
- },
- ]
- }
- ],
- }
- ]
- error = slack.messages[0]["attachments"][0]["blocks"][0]["text"]["text"]
- assert "KeyError: 'nothing'" in error
+ assert sentry_error["contexts"]["cell_info"] == {
+ "code": 'foo = {"bar": "baz"}\nfoo["nothing"]',
+ "cell_id": "ed399c0a",
+ "cell_number": "#1",
+ }
+ assert sentry_error["contexts"]["notebook_filter_info"] == {
+ "all": [AnySearch("/exception.ipynb$")],
+ "excluded_by_dir": [],
+ "excluded_by_requested": [],
+ "excluded_by_service": [],
+ "runnable": [AnySearch("/exception.ipynb$")],
+ }
+ assert sentry_error["contexts"]["notebook_info"] == {
+ "iteration": "1/1",
+ "notebook": "exception.ipynb",
+ }
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "execute_cell",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["contexts"]["repo_info"] == {
+ "repo_config_file": "PosixPath('mobu.yaml')",
+ "repo_hash": ANY,
+ "repo_ref": "main",
+ "repo_url": AnySearch(r"/notebooks$"),
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "NotebookCellExecutionError",
+ "value": ("exception.ipynb: Error executing cell"),
+ }
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NotebookRunner",
+ "cell": "ed399c0a",
+ "flock": "test",
+ "image_description": "Recommended (Weekly 2077_43)",
+ "image_reference": "lighthouse.ceres/library/sketchbook:recommended",
+ "node": "Node1",
+ "notebook": "exception.ipynb",
+ "phase": "execute_cell",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_attachment,) = sentry_items.attachments
+ assert sentry_attachment.filename == "nublado_error.txt"
+ assert "KeyError: 'nothing'" in sentry_attachment.bytes.decode()
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NotebookRunner - Execute notebook"
+ )
# Check events
common = {
diff --git a/tests/business/nubladopythonloop_test.py b/tests/business/nubladopythonloop_test.py
index d4afe6d7..f5ede1a1 100644
--- a/tests/business/nubladopythonloop_test.py
+++ b/tests/business/nubladopythonloop_test.py
@@ -10,6 +10,7 @@
import pytest
import respx
+from anys import ANY_AWARE_DATETIME_STR, AnyContains, AnyWithEntries
from httpx import AsyncClient
from rubin.nublado.client.testing import (
JupyterAction,
@@ -17,6 +18,7 @@
MockJupyter,
)
from safir.metrics import NOT_NONE, MockEventPublisher
+from safir.testing.sentry import Captured
from safir.testing.slack import MockSlackWebhook
from mobu.dependencies.config import config_dependency
@@ -64,7 +66,6 @@ async def test_run(
"name": "NubladoPythonLoop",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -232,9 +233,9 @@ async def test_delayed_delete(
async def test_hub_failed(
client: AsyncClient,
jupyter: MockJupyter,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
events: Events,
+ sentry_items: Captured,
) -> None:
config = config_dependency.config
mock_gafaelfawr(respx_mock)
@@ -260,59 +261,35 @@ async def test_hub_failed(
assert data["business"]["success_count"] == 0
assert data["business"]["failure_count"] > 0
- # Check that an appropriate error was posted.
+ # Confirm Sentry events
+ (sentry_error,) = sentry_items.errors
assert config.environment_url
url = urljoin(str(config.environment_url), "/nb/hub/spawn")
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": f"Status 500 from POST {url}",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nJupyterWebError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser2",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nspawn_lab",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser2",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": f"*URL*\nPOST {url}",
- "verbatim": True,
- },
- },
- {"type": "divider"},
- ]
- }
- ]
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "spawn_lab",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "JupyterWebError",
+ "value": (f"Status 500 from POST {url}"),
+ }
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_reference": None,
+ "image_description": None,
+ "phase": "spawn_lab",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser2"}
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - pre execute code"
+ )
# Check events
publisher = cast(MockEventPublisher, events.nublado_spawn_lab)
@@ -341,10 +318,9 @@ async def test_hub_failed(
async def test_redirect_loop(
client: AsyncClient,
jupyter: MockJupyter,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
- config = config_dependency.config
mock_gafaelfawr(respx_mock)
jupyter.redirect_loop = True
@@ -369,73 +345,46 @@ async def test_redirect_loop(
assert data["business"]["failure_count"] > 0
# Check that an appropriate error was posted.
- assert config.environment_url
- url = urljoin(
- str(config.environment_url),
- "/nb/hub/api/users/bot-mobu-testuser1/server/progress",
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "JupyterWebError",
+ "value": (
+ "TooManyRedirects: Exceeded maximum allowed redirects."
+ ),
+ }
+ )
)
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_reference": None,
+ "image_description": None,
+ "phase": "spawn_lab",
+ }
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "spawn_lab",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "TooManyRedirects: Exceeded maximum allowed"
- " redirects."
- ),
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nJupyterWebError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nspawn_lab",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": f"*URL*\nGET {url}",
- "verbatim": True,
- },
- },
- {"type": "divider"},
- ]
- }
- ]
+ (sentry_attachment,) = sentry_items.attachments
+ assert sentry_attachment.filename == "spawn_log.txt"
+ assert sentry_attachment.bytes.decode() == ""
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - pre execute code"
+ )
@pytest.mark.asyncio
async def test_spawn_timeout(
client: AsyncClient,
jupyter: MockJupyter,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
jupyter.spawn_timeout = True
@@ -460,56 +409,45 @@ async def test_spawn_timeout(
data = await wait_for_business(client, "bot-mobu-testuser1")
assert data["business"]["success_count"] == 0
assert data["business"]["failure_count"] > 0
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Lab did not spawn after 1s",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nJupyterTimeoutError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nspawn_lab",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {"type": "divider"},
- ]
- }
- ]
+
+ # Check that an appropriate error was posted.
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "spawn_lab",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "JupyterSpawnTimeoutError",
+ "value": ("Lab did not spawn after 1s"),
+ }
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_reference": None,
+ "image_description": None,
+ "phase": "spawn_lab",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_attachment,) = sentry_items.attachments
+ assert sentry_attachment.filename == "spawn_log.txt"
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - pre execute code"
+ )
@pytest.mark.asyncio
async def test_spawn_failed(
client: AsyncClient,
jupyter: MockJupyter,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
jupyter.fail("bot-mobu-testuser1", JupyterAction.PROGRESS)
@@ -534,68 +472,56 @@ async def test_spawn_failed(
data = await wait_for_business(client, "bot-mobu-testuser1")
assert data["business"]["success_count"] == 0
assert data["business"]["failure_count"] > 0
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Spawning lab failed",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nJupyterSpawnError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nspawn_lab",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {"type": "mrkdwn", "text": ANY, "verbatim": True},
- },
- {"type": "divider"},
- ],
- }
- ]
- log = slack.messages[0]["blocks"][2]["text"]["text"]
- log = re.sub(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d(.\d\d\d)?", "", log)
+
+ # Check that an appropriate error was posted.
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "spawn_lab",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "JupyterSpawnError",
+ "value": "",
+ }
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_reference": None,
+ "image_description": None,
+ "phase": "spawn_lab",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_attachment,) = sentry_items.attachments
+ assert sentry_attachment.filename == "spawn_log.txt"
+
+ log = re.sub(
+ r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d(.\d\d\d)?",
+ "",
+ sentry_attachment.bytes.decode(),
+ )
assert log == (
- "*Log*\n"
" - Server requested\n"
" - Spawning server...\n"
" - Spawn failed!"
)
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - pre execute code"
+ )
+
@pytest.mark.asyncio
async def test_delete_timeout(
client: AsyncClient,
jupyter: MockJupyter,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
jupyter.delete_immediate = False
@@ -627,56 +553,43 @@ async def test_delete_timeout(
data = await wait_for_business(client, "bot-mobu-testuser1")
assert data["business"]["success_count"] == 0
assert data["business"]["failure_count"] > 0
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Lab not deleted after 2s",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nJupyterTimeoutError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\ndelete_lab",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {"type": "divider"},
- ]
- }
- ]
+
+ # Check that an appropriate error was posted.
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "delete_lab",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "JupyterDeleteTimeoutError",
+ "value": "Lab not deleted after 2s",
+ }
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_description": "Recommended (Weekly 2077_43)",
+ "image_reference": "lighthouse.ceres/library/sketchbook:recommended",
+ "node": None,
+ "phase": "delete_lab",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - post execute code"
+ )
@pytest.mark.asyncio
async def test_code_exception(
client: AsyncClient,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
events: Events,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
@@ -704,87 +617,31 @@ async def test_code_exception(
assert data["business"]["failure_count"] == 1
# Check that an appropriate error was posted.
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Error while running code",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nCodeExecutionError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Image*\nRecommended (Weekly 2077_43)",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nexecute_code",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "*Node*\nNode1",
- "verbatim": True,
- },
- },
- ],
- "attachments": [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "*Code executed*\n"
- '```\nraise Exception("some error")\n```'
- ),
- "verbatim": True,
- },
- },
- ]
- }
- ],
- }
- ]
- error = slack.messages[0]["attachments"][0]["blocks"][0]["text"]["text"]
- assert "Exception: some error" in error
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["code_info"] == {
+ "code": 'raise Exception("some error")'
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "CodeExecutionError",
+ "value": "Code execution failed",
+ }
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_description": "Recommended (Weekly 2077_43)",
+ "image_reference": "lighthouse.ceres/library/sketchbook:recommended",
+ "node": "Node1",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - Execute Python"
+ )
# Check events
publisher = cast(MockEventPublisher, events.nublado_python_execution)
@@ -809,6 +666,7 @@ async def test_long_error(
jupyter: MockJupyter,
slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
@@ -842,90 +700,40 @@ async def test_long_error(
}
# Check that an appropriate error was posted.
- error = "... truncated ...\n"
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "CodeExecutionError",
+ "value": "Code execution failed",
+ }
+ )
+ )
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_description": "Recommended (Weekly 2077_43)",
+ "image_reference": "lighthouse.ceres/library/sketchbook:recommended",
+ "node": "Node1",
+ }
+ assert sentry_error["contexts"]["code_info"] == {
+ "code": "long_error_for_test()"
+ }
+
+ # Check that an appropriate error attachment was captured.
+ (sentry_attachment,) = sentry_items.attachments
+ text = sentry_attachment.bytes.decode()
+ error = ""
line = "this is a single line of output to test trimming errors"
- for i in range(5, 54):
+ for i in range(54):
error += f"{line} #{i}\n"
- assert 2977 - len(line) <= len(error) <= 2977
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Error while running code",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nCodeExecutionError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Image*\nRecommended (Weekly 2077_43)",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nexecute_code",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "*Node*\nNode1",
- "verbatim": True,
- },
- },
- ],
- "attachments": [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": f"*Error*\n```\n{error}```",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "*Code executed*\n"
- "```\nlong_error_for_test()\n```"
- ),
- "verbatim": True,
- },
- },
- ]
- }
- ],
- }
- ]
+ assert text == error
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - Execute Python"
+ )
@pytest.mark.asyncio
@@ -1029,8 +837,8 @@ async def test_lab_controller(
async def test_ansi_error(
client: AsyncClient,
jupyter: MockJupyter,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
@@ -1067,86 +875,30 @@ async def test_ansi_error(
assert data["business"]["failure_count"] == 1
# Check that an appropriate error was posted.
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Error while running code",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nCodeExecutionError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Image*\nRecommended (Weekly 2077_43)",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nexecute_code",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "*Node*\nNode1",
- "verbatim": True,
- },
- },
- ],
- "attachments": [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "*Code executed*\n"
- '```\nraise ValueError("\\033[38;5;28;01m'
- 'Foo\\033[39;00m")\n```'
- ),
- "verbatim": True,
- },
- },
- ]
- }
- ],
- }
- ]
- error = slack.messages[0]["attachments"][0]["blocks"][0]["text"]["text"]
+ (sentry_attachment,) = sentry_items.attachments
+ error = sentry_attachment.bytes.decode()
assert "ValueError: Foo" in error
assert "\033" not in error
+
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["code_info"] == {
+ "code": 'raise ValueError("\\033[38;5;28;01mFoo\\033[39;00m")'
+ }
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {"type": "CodeExecutionError", "value": "Code execution failed"}
+ )
+ )
+ assert sentry_error["tags"] == {
+ "business": "NubladoPythonLoop",
+ "flock": "test",
+ "image_description": "Recommended (Weekly 2077_43)",
+ "image_reference": "lighthouse.ceres/library/sketchbook:recommended",
+ "node": "Node1",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == (
+ "NubladoPythonLoop - Execute Python"
+ )
diff --git a/tests/business/tapqueryrunner_test.py b/tests/business/tapqueryrunner_test.py
index 58abfaab..7cf98b61 100644
--- a/tests/business/tapqueryrunner_test.py
+++ b/tests/business/tapqueryrunner_test.py
@@ -52,7 +52,6 @@ async def test_run(
"name": "TAPQueryRunner",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
diff --git a/tests/business/tapquerysetrunner_test.py b/tests/business/tapquerysetrunner_test.py
index ded17f6f..edb4dfa1 100644
--- a/tests/business/tapquerysetrunner_test.py
+++ b/tests/business/tapquerysetrunner_test.py
@@ -11,10 +11,11 @@
import respx
import structlog
import yaml
+from anys import ANY_AWARE_DATETIME_STR, AnyContains, AnySearch, AnyWithEntries
from httpx import AsyncClient
from safir.dependencies.http_client import http_client_dependency
from safir.metrics import NOT_NONE, MockEventPublisher
-from safir.testing.slack import MockSlackWebhook
+from safir.testing.sentry import Captured
import mobu
from mobu.events import Events
@@ -54,7 +55,6 @@ async def test_run(
"name": "TAPQuerySetRunner",
"refreshing": False,
"success_count": 1,
- "timings": ANY,
},
"state": "RUNNING",
"user": {
@@ -91,8 +91,8 @@ async def test_run(
@pytest.mark.asyncio
async def test_setup_error(
client: AsyncClient,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
+ sentry_items: Captured,
) -> None:
"""Test that client creation is deferred to setup.
@@ -117,62 +117,40 @@ async def test_setup_error(
data = await wait_for_business(client, "bot-mobu-tapuser")
assert data["business"]["failure_count"] == 1
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "Unable to create TAP client: DALServiceError:"
- " No working capabilities endpoint provided"
- ),
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nTAPClientError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-tapuser",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nmake_client",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-tapuser",
- "verbatim": True,
- },
- ],
- },
- {"type": "divider"},
- ]
- }
- ]
+ # Confirm Sentry events
+ (sentry_error,) = sentry_items.errors
+
+ assert sentry_error["exception"]["values"] == AnyContains(
+ AnyWithEntries(
+ {
+ "type": "DALServiceError",
+ "value": "No working capabilities endpoint provided",
+ }
+ )
+ )
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "make_client",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["tags"] == {
+ "flock": "test",
+ "business": "TAPQuerySetRunner",
+ "phase": "make_client",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-tapuser"}
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == "TAPQuerySetRunner - startup"
@pytest.mark.asyncio
async def test_failure(
client: AsyncClient,
- slack: MockSlackWebhook,
respx_mock: respx.Router,
events: Events,
+ sentry_items: Captured,
) -> None:
mock_gafaelfawr(respx_mock)
-
with patch.object(pyvo.dal, "TAPService") as mock:
mock.return_value.search.side_effect = [Exception("some error")]
@@ -192,6 +170,26 @@ async def test_failure(
data = await wait_for_business(client, "bot-mobu-testuser1")
assert data["business"]["failure_count"] == 1
+ # Confirm Sentry errors
+ (sentry_error,) = sentry_items.errors
+ assert sentry_error["contexts"]["phase"] == {
+ "phase": "mobu.tap.execute_query",
+ "started_at": ANY_AWARE_DATETIME_STR,
+ }
+ assert sentry_error["contexts"]["query_info"] == {
+ "started_at": ANY_AWARE_DATETIME_STR,
+ "query": AnySearch("SELECT"),
+ }
+ assert sentry_error["tags"] == {
+ "flock": "test",
+ "business": "TAPQuerySetRunner",
+ "phase": "mobu.tap.execute_query",
+ }
+ assert sentry_error["user"] == {"username": "bot-mobu-testuser1"}
+
+ (sentry_transaction,) = sentry_items.transactions
+ assert sentry_transaction["transaction"] == "TAPQuerySetRunner - execute"
+
# Confirm metrics events
published = cast(MockEventPublisher, events.tap_query).published
published.assert_published_all(
@@ -207,72 +205,6 @@ async def test_failure(
]
)
- assert slack.messages == [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": "Error while running TAP query",
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "fields": [
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {"type": "mrkdwn", "text": ANY, "verbatim": True},
- {
- "type": "mrkdwn",
- "text": "*Exception type*\nCodeExecutionError",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*User*\nbot-mobu-testuser1",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Event*\nexecute_query",
- "verbatim": True,
- },
- {
- "type": "mrkdwn",
- "text": "*Monkey*\ntest/bot-mobu-testuser1",
- "verbatim": True,
- },
- ],
- },
- ],
- "attachments": [
- {
- "blocks": [
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": (
- "*Error*\n```\nException: some error\n```"
- ),
- "verbatim": True,
- },
- },
- {
- "type": "section",
- "text": {
- "type": "mrkdwn",
- "text": ANY,
- "verbatim": True,
- },
- },
- ]
- }
- ],
- }
- ]
-
@pytest.mark.asyncio
async def test_random_object(events: Events) -> None:
diff --git a/tests/conftest.py b/tests/conftest.py
index df7ab6a0..80f7d564 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2,7 +2,7 @@
from __future__ import annotations
-from collections.abc import AsyncIterator, Iterator
+from collections.abc import AsyncIterator, Generator, Iterator
from contextlib import asynccontextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -24,6 +24,11 @@
mock_jupyter,
mock_jupyter_websocket,
)
+from safir.testing.sentry import (
+ Captured,
+ capture_events_fixture,
+ sentry_init_fixture,
+)
from safir.testing.slack import MockSlackWebhook, mock_slack_webhook
from structlog.stdlib import BoundLogger
@@ -31,6 +36,7 @@
from mobu.dependencies.config import config_dependency
from mobu.dependencies.context import context_dependency
from mobu.events import Events
+from mobu.sentry import before_send, send_all_error_transactions
from mobu.services.business.gitlfs import GitLFSBusiness
from mobu.services.business.nublado import _GET_IMAGE, _GET_NODE
@@ -286,3 +292,18 @@ def github_mocker() -> Iterator[GitHubMocker]:
github_mocker = GitHubMocker()
with github_mocker.router:
yield github_mocker
+
+
+@pytest.fixture
+def sentry_items(
+ monkeypatch: pytest.MonkeyPatch,
+) -> Generator[Captured]:
+ """Mock Sentry transport and yield a list that will contain all events."""
+ with sentry_init_fixture() as init:
+ init(
+ traces_sample_rate=1.0,
+ before_send=before_send,
+ before_send_transaction=send_all_error_transactions,
+ )
+ events = capture_events_fixture(monkeypatch)
+ yield events()
diff --git a/tests/data/config/autostart.yaml b/tests/data/config/autostart.yaml
index f32ca4cb..a6331170 100644
--- a/tests/data/config/autostart.yaml
+++ b/tests/data/config/autostart.yaml
@@ -1,5 +1,6 @@
slackAlerts: true
environmentUrl: "https://example.com"
+sentryEnvironment: "pytest"
metrics:
enabled: false
mock: true
diff --git a/tests/data/config/base.yaml b/tests/data/config/base.yaml
index f7abc651..27c86c4a 100644
--- a/tests/data/config/base.yaml
+++ b/tests/data/config/base.yaml
@@ -1,5 +1,6 @@
slackAlerts: true
environmentUrl: "https://example.com"
+sentryEnvironment: "pytest"
availableServices:
- some_service
- some_other_service
diff --git a/tests/data/config/github_ci_app.yaml b/tests/data/config/github_ci_app.yaml
index c51c5f2c..89a2b567 100644
--- a/tests/data/config/github_ci_app.yaml
+++ b/tests/data/config/github_ci_app.yaml
@@ -1,5 +1,6 @@
slackAlerts: true
environmentUrl: "https://example.com"
+sentryEnvironment: "pytest"
metrics:
enabled: false
mock: true
diff --git a/tests/data/config/github_refresh_app.yaml b/tests/data/config/github_refresh_app.yaml
index 9d86d1c2..dfbb2395 100644
--- a/tests/data/config/github_refresh_app.yaml
+++ b/tests/data/config/github_refresh_app.yaml
@@ -1,5 +1,6 @@
slackAlerts: true
environmentUrl: "https://example.com"
+sentryEnvironment: "pytest"
metrics:
enabled: false
mock: true
diff --git a/tests/handlers/flock_test.py b/tests/handlers/flock_test.py
index 09b3eb40..7745a68a 100644
--- a/tests/handlers/flock_test.py
+++ b/tests/handlers/flock_test.py
@@ -53,7 +53,6 @@ async def test_start_stop_refresh(
"name": "EmptyLoop",
"refreshing": False,
"success_count": ANY,
- "timings": ANY,
},
"state": ANY,
"user": {
@@ -175,7 +174,6 @@ async def test_user_list(
"name": "EmptyLoop",
"refreshing": False,
"success_count": ANY,
- "timings": ANY,
},
"state": ANY,
"user": {
@@ -193,7 +191,6 @@ async def test_user_list(
"name": "EmptyLoop",
"refreshing": False,
"success_count": ANY,
- "timings": ANY,
},
"state": ANY,
"user": {
diff --git a/tests/monkeyflocker_test.py b/tests/monkeyflocker_test.py
index 19c84523..0c745096 100644
--- a/tests/monkeyflocker_test.py
+++ b/tests/monkeyflocker_test.py
@@ -90,7 +90,6 @@ def test_start_report_refresh_stop(
"name": "EmptyLoop",
"refreshing": ANY,
"success_count": ANY,
- "timings": ANY,
},
"state": ANY,
"user": {
diff --git a/tests/timings_test.py b/tests/timings_test.py
deleted file mode 100644
index 34812f18..00000000
--- a/tests/timings_test.py
+++ /dev/null
@@ -1,72 +0,0 @@
-"""Test the Timings class."""
-
-from __future__ import annotations
-
-from datetime import timedelta
-
-import pytest
-from safir.datetime import current_datetime
-
-from mobu.models.timings import StopwatchData
-from mobu.services.timings import Timings
-
-
-def test_timings() -> None:
- timings = Timings()
- assert timings.dump() == []
-
- now = current_datetime(microseconds=True)
- with timings.start("something") as sw:
- assert sw.event == "something"
- assert sw.annotations == {}
- assert now + timedelta(seconds=5) > sw.start_time >= now
- assert sw.stop_time is None
- elapsed = sw.elapsed
- assert elapsed <= current_datetime(microseconds=True) - sw.start_time
- old_elapsed = sw.elapsed
-
- first_sw = sw
- assert first_sw.stop_time
- assert first_sw.stop_time > first_sw.start_time
- assert first_sw.elapsed == first_sw.stop_time - first_sw.start_time
- assert first_sw.elapsed >= old_elapsed
-
- with pytest.raises(ValueError, match="some exception"):
- with timings.start("else", {"foo": "bar"}) as sw:
- assert sw.annotations == {"foo": "bar"}
- assert sw.stop_time is None
- raise ValueError("some exception")
-
- second_sw = sw
- assert second_sw.stop_time
- assert second_sw.stop_time > second_sw.start_time
- assert second_sw.elapsed == second_sw.stop_time - second_sw.start_time
-
- assert timings.dump() == [
- StopwatchData(
- event="something",
- annotations={},
- start=first_sw.start_time,
- stop=first_sw.stop_time,
- elapsed=first_sw.elapsed,
- failed=False,
- ),
- StopwatchData(
- event="else",
- annotations={"foo": "bar"},
- start=second_sw.start_time,
- stop=second_sw.stop_time,
- elapsed=second_sw.elapsed,
- failed=True,
- ),
- ]
-
- with timings.start("incomplete") as sw:
- dump = timings.dump()
- assert dump[2] == StopwatchData(
- event="incomplete",
- annotations={},
- start=sw.start_time,
- stop=None,
- elapsed=None,
- )