Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: initial implementation of headless API (Bug 1941363) #194

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"datadog",
"django ~= 5.0",
"django-libsass",
"django-ninja",
"django-storages[google]",
"django_compressor",
"jinja2",
Expand Down Expand Up @@ -72,6 +73,7 @@ addopts = "--cov --cov-report html"
testpaths = [
"src/lando/api",
"src/lando/dockerflow",
"src/lando/headless_api",
"src/lando/main/tests",
"src/lando/tests",
"src/lando/ui/tests",
Expand Down
819 changes: 480 additions & 339 deletions requirements.txt

Large diffs are not rendered by default.

203 changes: 203 additions & 0 deletions src/lando/api/legacy/workers/automation_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import logging
from typing import Any

import kombu
from django.db import transaction

from lando.api.legacy.notifications import (
notify_user_of_landing_failure,
)
from lando.api.legacy.workers.base import Worker
from lando.headless_api.api import (
Action,
AddBranchAction,
AddCommitAction,
AutomationActionException,
MergeOntoAction,
TagAction,
)
from lando.headless_api.models.automation_job import (
AutomationJob,
)
from lando.main.models.landing_job import LandingJobAction, LandingJobStatus
from lando.main.scm.exceptions import (
SCMInternalServerError,
SCMLostPushRace,
SCMPushTimeoutException,
TreeApprovalRequired,
TreeClosed,
)
from lando.utils.tasks import phab_trigger_repo_update

logger = logging.getLogger(__name__)


def map_to_pydantic_action(action_type: str, action_data: dict[str, Any]) -> Action:
"""Convert a dict to an `Action` object."""
return {
"add-commit": AddCommitAction,
"merge-onto": MergeOntoAction,
"tag": TagAction,
"add-branch": AddBranchAction,
}[action_type](**action_data)


class AutomationWorker(Worker):
"""Worker to execute automation jobs.

This worker runs `AutomationJob`s on enabled repositories.
These jobs include a set of actions which are to be run on the repository,
and then pushed to the destination repo.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_job_finished = None
self.refresh_active_repos()

def loop(self):
logger.debug(
f"{len(self.worker_instance.enabled_repos)} "
"enabled repos: {self.worker_instance.enabled_repos}"
)

# Refresh repos if there is a mismatch in active vs. enabled repos.
if len(self.active_repos) != len(self.enabled_repos):
self.refresh_active_repos()

if self.last_job_finished is False:
logger.info("Last job did not complete, sleeping.")
self.throttle(self.worker_instance.sleep_seconds)
self.refresh_active_repos()

with transaction.atomic():
job = AutomationJob.next_job(repositories=self.enabled_repos).first()
Comment on lines +73 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only thing that needs to be atomic? Seems odd to have a single line here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece is copy/pasted from the landing worker implementation, and we have the same single line there. In the unused "mixin" we have more lines in the atomic context manager... So I'm not entirely sure. 😅


if job is None:
self.throttle(self.worker_instance.sleep_seconds)
return

with job.processing():
job.status = LandingJobStatus.IN_PROGRESS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a copy/paste error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "this"?

job.attempts += 1
job.save()

# Make sure the status and attempt count are updated in the database
logger.info("Starting automation job", extra={"id": job.id})
self.last_job_finished = self.run_automation_job(job)
logger.info("Finished processing automation job", extra={"id": job.id})

def run_automation_job(self, job: AutomationJob) -> bool:
"""Run an automation job."""
Copy link
Member

@shtrom shtrom Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Run an automation job."""
"""Run an automation job.
Returns True if the job is in a permanent state and should not be retried.
"""

repo = job.target_repo
scm = repo.scm

with scm.for_push(job.requester_email):
repo_pull_info = f"tree: {repo.tree}, pull path: {repo.pull_path}"
try:
# TODO should we always update to the latest pull_path for a repo?
# or perhaps we need to specify some commit SHA?
scm.update_repo(repo.pull_path)
Comment on lines +98 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment it cleans the repo back to a known state, and pulls the default branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: now it creates a new branch in git. This may be a problem we want to come back to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, update_repo accepts an optional target_cset in case we want to provide a commit SHA

def update_repo(self, pull_path: str, target_cset: Optional[str] = None) -> str:

except SCMInternalServerError as e:
message = (
f"`Temporary error ({e.__class__}) "
f"encountered while pulling from {repo_pull_info}"
)
logger.exception(message)
job.transition_status(LandingJobAction.DEFER, message=message)

# Try again, this is a temporary failure.
return False
except Exception as e:
message = f"Unexpected error while fetching repo from {repo.pull_path}."
logger.exception(message)
job.transition_status(
LandingJobAction.FAIL,
message=message + f"\n{e}",
)
return True

# Run each action for the job.
actions = job.actions.all()
for action_row in actions:
# Turn the row action into a Pydantic action.
action = map_to_pydantic_action(action_row.action_type, action_row.data)

# Execute the action locally.
try:
action.process(job, repo, scm, action_row.order)
except AutomationActionException as exc:
logger.exception(exc.message)
job.transition_status(exc.job_status, message=exc.message)
return exc.is_fatal

repo_push_info = f"tree: {repo.tree}, push path: {repo.push_path}"
try:
scm.push(
repo.push_path,
push_target=repo.push_target,
force_push=repo.force_push,
)
except (
TreeClosed,
TreeApprovalRequired,
SCMLostPushRace,
SCMPushTimeoutException,
SCMInternalServerError,
) as e:
message = (
f"`Temporary error ({e.__class__}) "
f"encountered while pushing to {repo_push_info}"
)
logger.exception(message)
job.transition_status(LandingJobAction.DEFER, message=message)
return False # Try again, this is a temporary failure.
except Exception as e:
message = f"Unexpected error while pushing to {repo.push_path}.\n{e}"
logger.exception(message)
job.transition_status(
LandingJobAction.FAIL,
message=message,
)
return True # Do not try again, this is a permanent failure.

# Get the changeset hash of the first node.
commit_id = scm.head_ref()

job.transition_status(LandingJobAction.LAND, commit_id=commit_id)

# Trigger update of repo in Phabricator so patches are closed quicker.
# Especially useful on low-traffic repositories.
if repo.phab_identifier:
self.phab_trigger_repo_update(repo.phab_identifier)

return True

@staticmethod
def notify_user_of_landing_failure(job: AutomationJob):
"""Wrapper around notify_user_of_landing_failure for convenience.

Args:
job (LandingJob): A LandingJob instance to use when fetching the
notification parameters.
"""
notify_user_of_landing_failure(
job.requester_email, job.landing_job_identifier, job.error, job.id
)

@staticmethod
def phab_trigger_repo_update(phab_identifier: str):
"""Wrapper around `phab_trigger_repo_update` for convenience.

Args:
phab_identifier: `str` to be passed to Phabricator to identify
repo.
"""
try:
# Send a Phab repo update task to Celery.
phab_trigger_repo_update.apply_async(args=(phab_identifier,))
except kombu.exceptions.OperationalError as e:
# Log the exception but continue gracefully.
# The repo will eventually update.
logger.exception("Failed sending repo update task to Celery.")
logger.exception(e)
Loading