Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move org deletion to background job with access to backend ops classes #2098

Merged
merged 13 commits into from
Oct 10, 2024
79 changes: 72 additions & 7 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""k8s background jobs"""

import asyncio
import os
from datetime import datetime
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
from uuid import UUID
@@ -19,6 +20,7 @@
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
@@ -273,6 +275,51 @@ async def create_delete_replica_job(
)
return None

async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
"""Create background job to delete org and its data"""

try:
job_id = await self.crawl_manager.run_delete_org_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
delete_org_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": delete_org_job.started,
"finished": delete_org_job.finished,
}
if delete_org_job.previousAttempts:
delete_org_job.previousAttempts.append(previous_attempt)
else:
delete_org_job.previousAttempts = [previous_attempt]
delete_org_job.started = dt_now()
delete_org_job.finished = None
delete_org_job.success = None
else:
delete_org_job = DeleteOrgJob(
id=job_id,
oid=org.id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": delete_org_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
@@ -316,10 +363,13 @@ async def job_finished(
)

async def get_background_job(
self, job_id: str, oid: UUID
) -> Union[CreateReplicaJob, DeleteReplicaJob]:
self, job_id: str, oid: Optional[UUID] = None
) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id, "oid": oid}
query: dict[str, object] = {"_id": job_id}
if oid:
query["oid"] = oid

res = await self.jobs.find_one(query)
if not res:
raise HTTPException(status_code=404, detail="job_not_found")
@@ -331,9 +381,10 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.CREATE_REPLICA:
return CreateReplicaJob.from_dict(data)

return DeleteReplicaJob.from_dict(data)
if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)

# return BackgroundJob.from_dict(data)
return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
self,
@@ -432,9 +483,8 @@ async def retry_background_job(
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

file = await self.get_replica_job_file(job, org)

if job.type == BgJobType.CREATE_REPLICA:
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
primary_storage.endpoint_url
@@ -452,6 +502,7 @@ async def retry_background_job(
)

if job.type == BgJobType.DELETE_REPLICA:
file = await self.get_replica_job_file(job, org)
await self.create_delete_replica_job(
org,
file,
@@ -461,6 +512,12 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.DELETE_ORG:
await self.create_delete_org_job(
org,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
@@ -523,6 +580,14 @@ async def get_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Retry failed background jobs from all orgs"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

return await ops.get_background_job(job_id)

@router.post("/{job_id}/retry", response_model=SuccessResponse)
async def retry_background_job(
job_id: str,
28 changes: 28 additions & 0 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
@@ -110,6 +110,34 @@ async def run_replica_job(

return job_id

async def run_delete_org_job(
self,
oid: str,
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
):
"""run job to delete org and all of its data"""

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"

params = {
"id": job_id,
"oid": oid,
"job_type": BgJobType.DELETE_ORG.value,
"backend_image": backend_image,
"pull_policy": pull_policy,
}

data = self.templates.env.get_template("background_job.yaml").render(params)

await self.create_from_yaml(data)

return job_id

async def create_crawl_job(
self,
crawlconfig: CrawlConfig,
2 changes: 1 addition & 1 deletion backend/btrixcloud/main.py
Original file line number Diff line number Diff line change
@@ -243,7 +243,7 @@ def main() -> None:

init_uploads_api(*base_crawl_init)

org_ops.set_ops(base_crawl_ops, profiles, coll_ops)
org_ops.set_ops(base_crawl_ops, profiles, coll_ops, background_job_ops)

user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

144 changes: 144 additions & 0 deletions backend/btrixcloud/main_bg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
""" entrypoint module for background jobs """

import asyncio
import os
import sys
import traceback
from uuid import UUID

from .crawlmanager import CrawlManager
from .db import init_db
from .emailsender import EmailSender

# from .utils import register_exit_handler
from .models import BgJobType

from .basecrawls import BaseCrawlOps
from .invites import InviteOps
from .users import init_user_manager
from .orgs import OrgOps
from .colls import CollectionOps
from .crawlconfigs import CrawlConfigOps
from .crawls import CrawlOps
from .profiles import ProfileOps
from .storages import StorageOps
from .webhooks import EventWebhookOps
from .background_jobs import BackgroundJobOps
from .pages import PageOps

job_type = os.environ.get("BG_JOB_TYPE")
oid = os.environ.get("OID")


# ============================================================================
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals
async def main():
"""main init"""
email = EmailSender()
crawl_manager = None

dbclient, mdb = init_db()

invite_ops = InviteOps(mdb, email)

user_manager = init_user_manager(mdb, email, invite_ops)

org_ops = OrgOps(mdb, invite_ops, user_manager)

event_webhook_ops = EventWebhookOps(mdb, org_ops)

# pylint: disable=import-outside-toplevel
if not os.environ.get("KUBERNETES_SERVICE_HOST"):
print(
"Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\
Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting"
)
sys.exit(1)

crawl_manager = CrawlManager()

storage_ops = StorageOps(org_ops, crawl_manager)

background_job_ops = BackgroundJobOps(
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
)

profile_ops = ProfileOps(
mdb, org_ops, crawl_manager, storage_ops, background_job_ops
)

crawl_config_ops = CrawlConfigOps(
dbclient,
mdb,
user_manager,
org_ops,
crawl_manager,
profile_ops,
)

coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops)

base_crawl_ops = BaseCrawlOps(
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)

crawl_ops = CrawlOps(
crawl_manager,
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)

page_ops = PageOps(mdb, crawl_ops, org_ops, storage_ops)

base_crawl_ops.set_page_ops(page_ops)
crawl_ops.set_page_ops(page_ops)

background_job_ops.set_ops(crawl_ops, profile_ops)

org_ops.set_ops(base_crawl_ops, profile_ops, coll_ops, background_job_ops)

user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

background_job_ops.set_ops(base_crawl_ops, profile_ops)

crawl_config_ops.set_coll_ops(coll_ops)

# Run job
if job_type == BgJobType.DELETE_ORG:
if not oid:
print("Org id missing, quitting")
return 1
org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
return 1

try:
await org_ops.delete_org_and_data(org, user_manager)
return 0
# pylint: disable=broad-exception-caught
except Exception:
traceback.print_exc()
return 1

print(f"Provided job type {job_type} not currently supported")
return 1


# # ============================================================================
if __name__ == "__main__":
return_code = asyncio.run(main())
sys.exit(return_code)
19 changes: 18 additions & 1 deletion backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
@@ -1966,6 +1966,7 @@ class BgJobType(str, Enum):

CREATE_REPLICA = "create-replica"
DELETE_REPLICA = "delete-replica"
DELETE_ORG = "delete-org"


# ============================================================================
@@ -2004,10 +2005,19 @@ class DeleteReplicaJob(BackgroundJob):
replica_storage: StorageRef


# ============================================================================
class DeleteOrgJob(BackgroundJob):
"""Model for tracking deletion of org data jobs"""

type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG


# ============================================================================
# Union of all job types, for response model

AnyJob = RootModel[Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob]]
AnyJob = RootModel[
Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob]
]


# ============================================================================
@@ -2227,6 +2237,13 @@ class DeletedResponse(BaseModel):
deleted: bool


# ============================================================================
class DeletedResponseId(DeletedResponse):
"""Response for delete API endpoints that return job id"""

id: str


# ============================================================================
class DeletedResponseQuota(DeletedResponse):
"""Response for delete API endpoints"""
15 changes: 10 additions & 5 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@
PAUSED_PAYMENT_FAILED,
REASON_PAUSED,
ACTIVE,
DeletedResponse,
DeletedResponseId,
UpdatedResponse,
AddedResponse,
AddedResponseId,
@@ -93,8 +93,10 @@
from .colls import CollectionOps
from .profiles import ProfileOps
from .users import UserManager
from .background_jobs import BackgroundJobOps
else:
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = UserManager = object
InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object
BackgroundJobOps = UserManager = object


DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization")
@@ -150,12 +152,14 @@ def set_ops(
base_crawl_ops: BaseCrawlOps,
profile_ops: ProfileOps,
coll_ops: CollectionOps,
background_job_ops: BackgroundJobOps,
) -> None:
"""Set base crawl ops"""
# pylint: disable=attribute-defined-outside-init
self.base_crawl_ops = base_crawl_ops
self.profile_ops = profile_ops
self.coll_ops = coll_ops
self.background_job_ops = background_job_ops

def set_default_primary_storage(self, storage: StorageRef):
"""set default primary storage"""
@@ -1438,15 +1442,16 @@ async def get_org(
org_out.execMinutesQuotaReached = ops.exec_mins_quota_reached(org)
return org_out

@router.delete("", tags=["organizations"], response_model=DeletedResponse)
@router.delete("", tags=["organizations"], response_model=DeletedResponseId)
async def delete_org(
org: Organization = Depends(org_dep), user: User = Depends(user_dep)
):
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

await ops.delete_org_and_data(org, user_manager)
return {"deleted": True}
job_id = await ops.background_job_ops.create_delete_org_job(org)

return {"deleted": True, "id": job_id}

@router.post("/rename", tags=["organizations"], response_model=UpdatedResponse)
async def rename_org(
36 changes: 34 additions & 2 deletions backend/test/test_z_delete_org.py
Original file line number Diff line number Diff line change
@@ -54,9 +54,41 @@ def test_delete_org_superadmin(admin_auth_headers, default_org_id):
f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers
)
assert r.status_code == 200
assert r.json()["deleted"]
data = r.json()
assert data["deleted"]

job_id = data["id"]

# Check that background job is launched and eventually succeeds
max_attempts = 18
attempts = 1
while True:
try:
r = requests.get(
f"{API_PREFIX}/orgs/all/jobs/{job_id}", headers=admin_auth_headers
)
assert r.status_code == 200
success = r.json()["success"]

if success:
break

if success is False:
assert False

if attempts >= max_attempts:
assert False

time.sleep(10)
except:
pass

attempts += 1

# Ensure org and items got deleted
r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers)
assert r.status_code == 404

# Ensure items got deleted
for item_id in item_ids:
r = requests.get(
f"{API_PREFIX}/orgs/all/all-crawls/{item_id}/replay.json",
59 changes: 59 additions & 0 deletions chart/app-templates/background_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
apiVersion: batch/v1
kind: Job
metadata:
name: "{{ id }}"
labels:
role: "background-job"
job_type: {{ job_type }}
btrix.org: {{ oid }}

spec:
ttlSecondsAfterFinished: 0
backoffLimit: 3
template:
spec:
restartPolicy: Never
priorityClassName: bg-job
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: btrixbgjob
operator: NotIn
values: [0]

volumes:
- name: ops-configs
secret:
secretName: ops-configs

containers:
- name: btrixbgjob
image: {{ backend_image }}
imagePullPolicy: {{ pull_policy }}
env:
- name: BG_JOB_TYPE
value: {{ job_type }}

- name: OID
value: {{ oid }}

envFrom:
- configMapRef:
name: backend-env-config
- secretRef:
name: mongo-auth

volumeMounts:
- name: ops-configs
mountPath: /ops-configs/

command: ["python3", "-m", "btrixcloud.main_bg"]

resources:
limits:
memory: "200Mi"

requests:
memory: "200Mi"
cpu: "50m"
79 changes: 79 additions & 0 deletions chart/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -72,6 +72,85 @@ data:

LOG_SENT_EMAILS: "{{ .Values.email.log_sent_emails }}"

BACKEND_IMAGE: "{{ .Values.backend_image }}"

BACKEND_IMAGE_PULL_POLICY: "{{ .Values.backend_pull_policy }}"


---
apiVersion: v1
kind: ConfigMap
metadata:
name: backend-env-config
namespace: {{ .Values.crawler_namespace }}

data:
APP_ORIGIN: {{ .Values.ingress.tls | ternary "https" "http" }}://{{ or .Values.ingress.host ( print "localhost:" ( .Values.local_service_port | default 9870 )) }}

CRAWLER_NAMESPACE: {{ .Values.crawler_namespace }}

DEFAULT_NAMESPACE: {{ .Release.Namespace }}

FRONTEND_ORIGIN: {{ .Values.frontend_alias | default "http://browsertrix-cloud-frontend" }}

CRAWLER_FQDN_SUFFIX: ".{{ .Values.crawler_namespace }}.svc.cluster.local"

DEFAULT_ORG: "{{ .Values.default_org }}"

INVITE_EXPIRE_SECONDS: "{{ .Values.invite_expire_seconds }}"

REGISTRATION_ENABLED: "{{ .Values.registration_enabled | default 0 }}"

REGISTER_TO_ORG_ID: "{{ .Values.registration_org_id }}"

ALLOW_DUPE_INVITES: "{{ .Values.allow_dupe_invites | default 0 }}"

JWT_TOKEN_LIFETIME_MINUTES: "{{ .Values.jwt_token_lifetime_minutes | default 60 }}"

DEFAULT_BEHAVIOR_TIME_SECONDS: "{{ .Values.default_behavior_time_seconds }}"

DEFAULT_PAGE_LOAD_TIME_SECONDS: "{{ .Values.default_page_load_time_seconds }}"

DEFAULT_CRAWL_FILENAME_TEMPLATE: "{{ .Values.default_crawl_filename_template }}"

MAX_PAGES_PER_CRAWL: "{{ .Values.max_pages_per_crawl | default 0 }}"

IDLE_TIMEOUT: "{{ .Values.profile_browser_idle_seconds | default 60 }}"

RERUN_FROM_MIGRATION: "{{ .Values.rerun_from_migration }}"

PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes }}"

FAST_RETRY_SECS: "{{ .Values.operator_fast_resync_secs | default 3 }}"

MAX_CRAWL_SCALE: "{{ .Values.max_crawl_scale | default 3 }}"

LOG_FAILED_CRAWL_LINES: "{{ .Values.log_failed_crawl_lines | default 0 }}"

IS_LOCAL_MINIO: "{{ .Values.minio_local }}"

STORAGES_JSON: "/ops-configs/storages.json"

CRAWLER_CHANNELS_JSON: "/ops-configs/crawler_channels.json"

MIN_QA_CRAWLER_IMAGE: "{{ .Values.min_qa_crawler_image }}"

MAX_CRAWLER_MEMORY: "{{ .Values.max_crawler_memory }}"

ENABLE_AUTO_RESIZE_CRAWLERS: "{{ .Values.enable_auto_resize_crawlers }}"

BILLING_ENABLED: "{{ .Values.billing_enabled }}"

SIGN_UP_URL: "{{ .Values.sign_up_url }}"

SALES_EMAIL: "{{ .Values.sales_email }}"

LOG_SENT_EMAILS: "{{ .Values.email.log_sent_emails }}"

BACKEND_IMAGE: "{{ .Values.backend_image }}"

BACKEND_IMAGE_PULL_POLICY: "{{ .Values.backend_pull_policy }}"

---
apiVersion: v1
kind: ConfigMap
13 changes: 13 additions & 0 deletions chart/templates/secrets.yaml
Original file line number Diff line number Diff line change
@@ -34,6 +34,19 @@ data:
crawler_channels.json: {{ .Values.crawler_channels | toJson | b64enc | quote }}


---
apiVersion: v1
kind: Secret
metadata:
name: ops-configs
namespace: {{ $.Values.crawler_namespace }}

type: Opaque
data:
storages.json: {{ .Values.storages | toJson | b64enc | quote }}
crawler_channels.json: {{ .Values.crawler_channels | toJson | b64enc | quote }}


{{- range $storage := .Values.storages }}
---
apiVersion: v1