Skip to content

Commit

Permalink
Add REST API tests to check access rights when changing source/target…
Browse files Browse the repository at this point in the history
… storage (#8479)
  • Loading branch information
Marishka17 authored Oct 1, 2024
1 parent 99669f5 commit 7551d6c
Show file tree
Hide file tree
Showing 14 changed files with 1,413 additions and 57 deletions.
13 changes: 10 additions & 3 deletions tests/python/rest_api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

import io
import json
import operator
import os
import xml.etree.ElementTree as ET
import zipfile
from copy import deepcopy
from datetime import datetime
from http import HTTPStatus
from io import BytesIO
from itertools import groupby, product
Expand Down Expand Up @@ -1270,10 +1272,15 @@ def test_can_update_assignee_updated_date_on_assignee_updates(
job["id"], patched_job_write_request={"assignee": new_assignee_id}
)

if new_assignee_id == old_assignee_id:
assert updated_job.assignee_updated_date == job["assignee_updated_date"]
op = operator.eq if new_assignee_id == old_assignee_id else operator.ne

if isinstance(updated_job.assignee_updated_date, datetime):
assert op(
str(updated_job.assignee_updated_date.isoformat()).replace("+00:00", "Z"),
job["assignee_updated_date"],
)
else:
assert updated_job.assignee_updated_date != job["assignee_updated_date"]
assert op(updated_job.assignee_updated_date, job["assignee_updated_date"])

if new_assignee_id:
assert updated_job.assignee.id == new_assignee_id
Expand Down
6 changes: 5 additions & 1 deletion tests/python/rest_api/test_labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,11 @@ def test_task_label_update_triggers_nested_task_and_job_update(
):
# Checks for regressions against the issue https://github.com/cvat-ai/cvat/issues/6871

task = next(t for t in tasks_wlc if t["jobs"]["count"] and t["labels"]["count"])
task = next(
t
for t in tasks_wlc
if t["jobs"]["count"] and t["labels"]["count"] and not t["project_id"]
)
task_labels = [l for l in labels if l.get("task_id") == task["id"]]
nested_jobs = [j for j in jobs if j["task_id"] == task["id"]]

Expand Down
99 changes: 83 additions & 16 deletions tests/python/rest_api/test_organizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
from copy import deepcopy
from http import HTTPStatus
from typing import Optional

import pytest
from cvat_sdk.api_client.api_client import ApiClient, Endpoint
Expand Down Expand Up @@ -38,12 +39,29 @@ class TestMetadataOrganizations:
(None, "supervisor", True),
],
)
def test_can_send_options_request(self, privilege, role, is_member, find_users, organizations):
def test_can_send_options_request(
self,
privilege: Optional[str],
role: Optional[str],
is_member: Optional[bool],
find_users,
organizations,
):
exclude_org = None if is_member else self._ORG
org = self._ORG if is_member else None
user = find_users(privilege=privilege, role=role, org=org, exclude_org=exclude_org)[0][
"username"
]

filters = {}

for key, value in {
"privilege": privilege,
"role": role,
"org": org,
"exclude_org": exclude_org,
}.items():
if value is not None:
filters[key] = value

user = find_users(**filters)[0]["username"]

response = options_method(user, f"organizations")
assert response.status_code == HTTPStatus.OK
Expand All @@ -70,13 +88,29 @@ class TestGetOrganizations:
],
)
def test_can_see_specific_organization(
self, privilege, role, is_member, is_allow, find_users, organizations
self,
privilege: Optional[str],
role: Optional[str],
is_member: Optional[bool],
is_allow: bool,
find_users,
organizations,
):
exclude_org = None if is_member else self._ORG
org = self._ORG if is_member else None
user = find_users(privilege=privilege, role=role, org=org, exclude_org=exclude_org)[0][
"username"
]

filters = {}

for key, value in {
"privilege": privilege,
"role": role,
"org": org,
"exclude_org": exclude_org,
}.items():
if value is not None:
filters[key] = value

user = find_users(**filters)[0]["username"]

response = get_method(user, f"organizations/{self._ORG}")
if is_allow:
Expand Down Expand Up @@ -157,14 +191,30 @@ def expected_data(self, organizations, request_data):
],
)
def test_can_update_specific_organization(
self, privilege, role, is_member, is_allow, find_users, request_data, expected_data
self,
privilege: Optional[str],
role: Optional[str],
is_member: Optional[bool],
is_allow: bool,
find_users,
request_data,
expected_data,
):
exclude_org = None if is_member else self._ORG
org = self._ORG if is_member else None
user = find_users(privilege=privilege, role=role, org=org, exclude_org=exclude_org)[0][
"username"
]

filters = {}

for key, value in {
"privilege": privilege,
"role": role,
"org": org,
"exclude_org": exclude_org,
}.items():
if value is not None:
filters[key] = value

user = find_users(**filters)[0]["username"]
response = patch_method(user, f"organizations/{self._ORG}", request_data)

if is_allow:
Expand Down Expand Up @@ -193,12 +243,29 @@ class TestDeleteOrganizations:
("worker", None, False, False),
],
)
def test_can_delete(self, privilege, role, is_member, is_allow, find_users):
def test_can_delete(
self,
privilege: Optional[str],
role: Optional[str],
is_member: Optional[bool],
is_allow: bool,
find_users,
):
exclude_org = None if is_member else self._ORG
org = self._ORG if is_member else None
user = find_users(privilege=privilege, role=role, org=org, exclude_org=exclude_org)[0][
"username"
]

filters = {}

for key, value in {
"privilege": privilege,
"role": role,
"org": org,
"exclude_org": exclude_org,
}.items():
if value is not None:
filters[key] = value

user = find_users(**filters)[0]["username"]

response = delete_method(user, f"organizations/{self._ORG}")

Expand Down
Loading

0 comments on commit 7551d6c

Please sign in to comment.