Skip to content

Commit

Permalink
Merge pull request #349 from lsst-sqre/tickets/DM-44145/notebook-refresh
Browse files Browse the repository at this point in the history
DM-44461: Refresh notebooks
  • Loading branch information
fajpunk committed May 30, 2024
2 parents 4e04dff + 3f3a206 commit 188df6a
Show file tree
Hide file tree
Showing 35 changed files with 1,086 additions and 284 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: check-toml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.4
rev: v0.4.6
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
6 changes: 6 additions & 0 deletions changelog.d/20240530_151558_danfuchs_notebook_refresh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### New features

- `NotebookRunner` flocks can now pick up changes to their notebooks without having to restart the whole mobu process. This refresh can happen via:
- GitHub `push` webhook post to `/mobu/github/webhook` with changes to a repo and branch that matches the flock config
- `monkeyflocker refresh <flock>`
- `POST` to `/mobu/flocks/{flock}/refresh`
176 changes: 86 additions & 90 deletions requirements/dev.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ uvicorn[standard]
# Other dependencies.
aiojobs
click!=8.1.4,!=8.1.5 # see https://github.com/pallets/click/issues/2558
gidgethub
httpx
httpx-sse
jinja2
Expand Down
364 changes: 183 additions & 181 deletions requirements/main.txt

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions src/mobu/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class Configuration(BaseSettings):
examples=["gt-vilSCi1ifK_MyuaQgMD2dQ.d6SIJhowv5Hs3GvujOyUig"],
)

github_webhook_secret: str | None = Field(
None,
title="Github webhook secret",
description=(
"Any repo that wants mobu to automatically respawn labs when"
" notebooks change must use this secret in its webhook"
" configuration in GitHub."
),
validation_alias="MOBU_GITHUB_WEBHOOK_SECRET",
)

name: str = Field(
"mobu",
title="Name of application",
Expand Down
4 changes: 4 additions & 0 deletions src/mobu/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
from datetime import timedelta

__all__ = [
"GITHUB_WEBHOOK_WAIT_SECONDS",
"NOTEBOOK_REPO_URL",
"NOTEBOOK_REPO_BRANCH",
"TOKEN_LIFETIME",
"USERNAME_REGEX",
"WEBSOCKET_OPEN_TIMEOUT",
]

GITHUB_WEBHOOK_WAIT_SECONDS = 1
"""GithHub needs some time to actually be in the state in a webhook payload."""

NOTEBOOK_REPO_URL = "https://github.com/lsst-sqre/notebook-demo.git"
"""Default notebook repository for NotebookRunner."""

Expand Down
23 changes: 22 additions & 1 deletion src/mobu/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"""

from dataclasses import dataclass
from typing import Annotated
from typing import Annotated, Any

from fastapi import Depends, Request
from safir.dependencies.gafaelfawr import auth_logger_dependency
from safir.dependencies.http_client import http_client_dependency
from safir.dependencies.logger import logger_dependency
from structlog.stdlib import BoundLogger

from ..factory import Factory, ProcessContext
Expand All @@ -21,6 +22,7 @@
"ContextDependency",
"RequestContext",
"context_dependency",
"anonymous_context_dependency",
]


Expand All @@ -40,6 +42,17 @@ class RequestContext:
factory: Factory
"""Component factory."""

def rebind_logger(self, **values: Any) -> None:
"""Add the given values to the logging context.
Parameters
----------
**values
Additional values that should be added to the logging context.
"""
self.logger = self.logger.bind(**values)
self.factory.set_logger(self.logger)


class ContextDependency:
"""Provide a per-request context as a FastAPI dependency.
Expand Down Expand Up @@ -90,3 +103,11 @@ async def aclose(self) -> None:

context_dependency = ContextDependency()
"""The dependency that will return the per-request context."""


async def anonymous_context_dependency(
request: Request,
logger: Annotated[BoundLogger, Depends(logger_dependency)],
) -> RequestContext:
"""Per-request context for non-gafaelfawr-auth'd requests."""
return await context_dependency(request=request, logger=logger)
13 changes: 13 additions & 0 deletions src/mobu/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,16 @@ def create_solitary(self, solitary_config: SolitaryConfig) -> Solitary:
http_client=self._context.http_client,
logger=self._logger,
)

def set_logger(self, logger: BoundLogger) -> None:
"""Replace the internal logger.
Used by the context dependency to update the logger for all
newly-created components when it's rebound with additional context.
Parameters
----------
logger
New logger.
"""
self._logger = logger
14 changes: 14 additions & 0 deletions src/mobu/handlers/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ async def get_flock(
return context.manager.get_flock(flock).dump()


@external_router.post(
"/flocks/{flock}/refresh",
responses={404: {"description": "Flock not found", "model": ErrorModel}},
status_code=202,
summary="Signal a flock to refresh",
)
async def refresh_flock(
flock: str,
context: Annotated[RequestContext, Depends(context_dependency)],
) -> None:
context.logger.info("Signaling flock to refresh", flock=flock)
context.manager.refresh_flock(flock)


@external_router.delete(
"/flocks/{flock}",
responses={404: {"description": "Flock not found", "model": ErrorModel}},
Expand Down
45 changes: 45 additions & 0 deletions src/mobu/handlers/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Handlers for requests from GitHub, ``/mobu/github``."""

import asyncio
from typing import Annotated

from fastapi import APIRouter, Depends
from gidgethub.sansio import Event
from safir.slack.webhook import SlackRouteErrorHandler

from ..config import config
from ..constants import GITHUB_WEBHOOK_WAIT_SECONDS
from ..dependencies.context import RequestContext, anonymous_context_dependency
from .github_webhooks import webhook_router

github_router = APIRouter(route_class=SlackRouteErrorHandler)


@github_router.post(
"/webhook",
summary="GitHub webhooks",
description="This endpoint receives webhook events from GitHub.",
status_code=202,
)
async def post_github_webhook(
context: Annotated[RequestContext, Depends(anonymous_context_dependency)],
) -> None:
"""Process GitHub webhook events.
This should be exposed via a Gafaelfawr anonymous ingress.
"""
webhook_secret = config.github_webhook_secret
body = await context.request.body()
event = Event.from_http(
context.request.headers, body, secret=webhook_secret
)

# Bind the X-GitHub-Delivery header to the logger context; this
# identifies the webhook request in GitHub's API and UI for
# diagnostics
context.rebind_logger(github_delivery=event.delivery_id)

context.logger.debug("Received GitHub webhook", payload=event.data)
# Give GitHub some time to reach internal consistency.
await asyncio.sleep(GITHUB_WEBHOOK_WAIT_SECONDS)
await webhook_router.dispatch(event=event, context=context)
39 changes: 39 additions & 0 deletions src/mobu/handlers/github_webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Github webhook handlers."""

from gidgethub import routing
from gidgethub.sansio import Event

from ..dependencies.context import RequestContext

__all__ = ["webhook_router"]

webhook_router = routing.Router()


@webhook_router.register("push")
async def handle_push(event: Event, context: RequestContext) -> None:
"""Handle a push event."""
ref = event.data["ref"]
url = event.data["repository"]["clone_url"]
context.rebind_logger(ref=ref, url=url)

prefix, branch = ref.rsplit("/", 1)
if prefix != "refs/heads":
context.logger.debug(
"github webhook ignored: ref is not a branch",
)
return

flocks = context.manager.list_flocks_for_repo(
repo_url=url, repo_branch=branch
)
if not flocks:
context.logger.debug(
"github webhook ignored: no flocks match repo and branch",
)
return

for flock in flocks:
context.manager.refresh_flock(flock)

context.logger.info("github webhook handled")
2 changes: 2 additions & 0 deletions src/mobu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .config import config
from .dependencies.context import context_dependency
from .handlers.external import external_router
from .handlers.github import github_router
from .handlers.internal import internal_router
from .status import post_status

Expand Down Expand Up @@ -69,6 +70,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Attach the routers.
app.include_router(internal_router)
app.include_router(external_router, prefix=config.path_prefix)
app.include_router(github_router, prefix=f"{config.path_prefix}/github")

# Add middleware.
app.add_middleware(XForwardedMiddleware)
Expand Down
4 changes: 4 additions & 0 deletions src/mobu/models/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class BusinessData(BaseModel):

success_count: int = Field(..., title="Number of successes", examples=[25])

refreshing: bool = Field(
..., title="If the business is currently in the process of refreshing"
)

timings: list[StopwatchData] = Field(..., title="Timings of events")

model_config = ConfigDict(extra="forbid")
5 changes: 5 additions & 0 deletions src/mobu/services/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(
self.timings = Timings()
self.control: Queue[BusinessCommand] = Queue()
self.stopping = False
self.refreshing = False

# Methods that should be overridden by child classes if needed.

Expand Down Expand Up @@ -204,6 +205,9 @@ async def stop(self) -> None:
await self.control.join()
self.logger.info("Stopped")

def signal_refresh(self) -> None:
self.refreshing = True

# Utility functions that can be used by child classes.

async def pause(self, interval: timedelta) -> bool:
Expand Down Expand Up @@ -299,6 +303,7 @@ def dump(self) -> BusinessData:
name=type(self).__name__,
failure_count=self.failure_count,
success_count=self.success_count,
refreshing=self.refreshing,
timings=self.timings.dump(),
)

Expand Down
23 changes: 20 additions & 3 deletions src/mobu/services/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,34 @@ def annotations(self, cell_id: str | None = None) -> dict[str, str]:
return result

async def startup(self) -> None:
await self.initialize()
await super().startup()

async def cleanup(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None

async def initialize(self) -> None:
if self._repo_dir is None:
self._repo_dir = Path(TemporaryDirectory().name)
await self.clone_repo()

self._exclude_paths = {
(self._repo_dir / path) for path in self.options.exclude_dirs
}
self._notebook_paths = self.find_notebooks()
self.logger.info("Repository cloned and ready")
await super().startup()

async def shutdown(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None
await self.cleanup()
await super().shutdown()

async def refresh(self) -> None:
self.logger.info("Recloning notebooks and forcing new execution")
await self.cleanup()
await self.initialize()
self.refreshing = False

async def clone_repo(self) -> None:
url = self.options.repo_url
branch = self.options.repo_branch
Expand Down Expand Up @@ -151,6 +164,10 @@ async def open_session(

async def execute_code(self, session: JupyterLabSession) -> None:
for count in range(self.options.max_executions):
if self.refreshing:
await self.refresh()
return

self._notebook = self.next_notebook()

iteration = f"{count + 1}/{self.options.max_executions}"
Expand Down
23 changes: 23 additions & 0 deletions src/mobu/services/flock.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from structlog.stdlib import BoundLogger

from ..exceptions import MonkeyNotFoundError
from ..models.business.notebookrunner import (
NotebookRunnerConfig,
NotebookRunnerOptions,
)
from ..models.flock import FlockConfig, FlockData, FlockSummary
from ..models.user import AuthenticatedUser, User, UserSpec
from ..storage.gafaelfawr import GafaelfawrStorage
Expand Down Expand Up @@ -130,6 +134,25 @@ async def stop(self) -> None:
awaits = [m.stop() for m in self._monkeys.values()]
await asyncio.gather(*awaits)

def signal_refresh(self) -> None:
"""Signal all the monkeys to refresh their busniess."""
self._logger.info("Signaling monkeys to refresh")
for monkey in self._monkeys.values():
monkey.signal_refresh()

def uses_repo(self, repo_url: str, repo_branch: str) -> bool:
match self._config:
case FlockConfig(
business=NotebookRunnerConfig(
options=NotebookRunnerOptions(
repo_url=url,
repo_branch=branch,
)
)
) if (url, branch) == (repo_url, repo_branch):
return True
return False

def _create_monkey(self, user: AuthenticatedUser) -> Monkey:
"""Create a monkey that will run as a given user."""
return Monkey(
Expand Down
Loading

0 comments on commit 188df6a

Please sign in to comment.