Skip to content

Commit

Permalink
Merge pull request #393 from aldbr/main_FIX_sb-assignment-edge-cases
Browse files Browse the repository at this point in the history
fix(db): assigning non existing sandbox or multiple times the same sandbox
  • Loading branch information
chrisburr authored Feb 20, 2025
2 parents 4db5094 + e03d727 commit 0bbb215
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 9 deletions.
10 changes: 10 additions & 0 deletions diracx-core/src/diracx/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ def __init__(self, pfn: str, se_name: str, detail: str | None = None):
)


class SandboxAlreadyAssignedError(Exception):
def __init__(self, pfn: str, se_name: str, detail: str | None = None):
self.pfn: str = pfn
self.se_name: str = se_name
super().__init__(
f"Sandbox with {pfn} and {se_name} already assigned"
+ (" ({detail})" if detail else "")
)


class JobError(Exception):
def __init__(self, job_id, detail: str | None = None):
self.job_id: int = job_id
Expand Down
12 changes: 10 additions & 2 deletions diracx-db/src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import Executable, delete, insert, literal, select, update
from sqlalchemy.exc import IntegrityError, NoResultFound

from diracx.core.exceptions import SandboxNotFoundError
from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError
from diracx.core.models import SandboxInfo, SandboxType, UserInfo
from diracx.db.sql.utils import BaseSQLDB, utcnow

Expand Down Expand Up @@ -135,10 +135,18 @@ async def assign_sandbox_to_jobs(
stmt = insert(SBEntityMapping).from_select(
["SBId", "EntityId", "Type"], select_sb_id
)
await self.conn.execute(stmt)
try:
await self.conn.execute(stmt)
except IntegrityError as e:
raise SandboxAlreadyAssignedError(pfn, se_name) from e

stmt = update(SandBoxes).where(SandBoxes.SEPFN == pfn).values(Assigned=True)
result = await self.conn.execute(stmt)
if result.rowcount == 0:
# If the update didn't affect any row, the sandbox doesn't exist
# It means the previous insert didn't have any effect
raise SandboxNotFoundError(pfn, se_name)

assert result.rowcount == 1

async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
Expand Down
23 changes: 16 additions & 7 deletions diracx-routers/src/diracx/routers/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydantic_settings import SettingsConfigDict
from pyparsing import Any

from diracx.core.exceptions import SandboxNotFoundError
from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError
from diracx.core.models import (
SandboxInfo,
SandboxType,
Expand Down Expand Up @@ -267,12 +267,21 @@ async def assign_sandbox_to_job(
"""Map the pfn as output sandbox to job."""
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
short_pfn = pfn.split("|", 1)[-1]
await sandbox_metadata_db.assign_sandbox_to_jobs(
jobs_ids=[job_id],
pfn=short_pfn,
sb_type=SandboxType.Output,
se_name=settings.se_name,
)
try:
await sandbox_metadata_db.assign_sandbox_to_jobs(
jobs_ids=[job_id],
pfn=short_pfn,
sb_type=SandboxType.Output,
se_name=settings.se_name,
)
except SandboxNotFoundError as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"
) from e
except (SandboxAlreadyAssignedError, AssertionError) as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox already assigned"
) from e


@router.delete("/{job_id}/sandbox")
Expand Down
61 changes: 61 additions & 0 deletions diracx-routers/tests/jobs/test_sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,64 @@ def test_get_empty_job_sandboxes(normal_user_client: TestClient):
r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox")
assert r.status_code == 200
assert r.json() == {"Input": [None], "Output": [None]}


def test_assign_nonexisting_sb_to_job(normal_user_client: TestClient):
"""Test that we cannot assign a non-existing sandbox to a job."""
# Submit a job:
job_definitions = [TEST_JDL]
r = normal_user_client.post("/api/jobs/jdl", json=job_definitions)
assert r.status_code == 200, r.json()
assert len(r.json()) == len(job_definitions)
job_id = r.json()[0]["JobID"]

# Malformed request:
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json="/S3/pathto/vo/vo_group/user/sha256:55967b0c430058c3105472b1edae6c8987c65bcf01ef58f10a3f5e93948782d8.tar.bz2",
)
assert r.status_code == 400


def test_assign_sb_to_job_twice(normal_user_client: TestClient):
"""Test that we cannot assign a sandbox to a job twice."""
data = secrets.token_bytes(512)
checksum = hashlib.sha256(data).hexdigest()

# Upload Sandbox:
r = normal_user_client.post(
"/api/jobs/sandbox",
json={
"checksum_algorithm": "sha256",
"checksum": checksum,
"size": len(data),
"format": "tar.bz2",
},
)

assert r.status_code == 200, r.text
upload_info = r.json()
assert upload_info["url"]
sandbox_pfn = upload_info["pfn"]
assert sandbox_pfn.startswith("SB:SandboxSE|/S3/")

# Submit a job:
job_definitions = [TEST_JDL]
r = normal_user_client.post("/api/jobs/jdl", json=job_definitions)
assert r.status_code == 200, r.json()
assert len(r.json()) == len(job_definitions)
job_id = r.json()[0]["JobID"]

# Assign sandbox to the job: first attempt should be successful
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json=sandbox_pfn,
)
assert r.status_code == 200

# Assign sandbox to the job: second attempt should fail
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json=sandbox_pfn,
)
assert r.status_code == 400

0 comments on commit 0bbb215

Please sign in to comment.