forked from aws-deadline/deadline-cloud-test-fixtures
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_attachment_manager.py
108 lines (93 loc) · 3.55 KB
/
job_attachment_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from __future__ import annotations
from dataclasses import InitVar, dataclass, field
import os
from botocore.client import BaseClient
from botocore.exceptions import ClientError, WaiterError
from .deadline.client import DeadlineClient
from .deadline import (
Farm,
Queue,
)
from .models import (
JobAttachmentSettings,
JobRunAsUser,
PosixSessionUser,
WindowsSessionUser,
)
from uuid import uuid4
@dataclass
class JobAttachmentManager:
"""
Responsible for setting up and tearing down job attachment test resources
"""
s3_client: BaseClient
deadline_client: DeadlineClient
stage: InitVar[str]
account_id: InitVar[str]
bucket_name: str
farm_id: str
queue: Queue | None = field(init=False, default=None)
queue_with_no_settings: Queue | None = field(init=False, default=None)
bucket_root_prefix: str = os.environ.get("JA_TEST_ROOT_PREFIX", "") + str(
uuid4()
) # Set the bucket root prefix for this test run to an UUID to avoid async test execution race conditions
def deploy_resources(self):
"""
Deploy all of the resources needed for job attachment integration tests.
"""
try:
self.queue = Queue.create(
client=self.deadline_client,
display_name="job_attachments_test_queue",
farm=Farm(self.farm_id),
job_run_as_user=JobRunAsUser(
posix=PosixSessionUser("", ""),
runAs="WORKER_AGENT_USER",
windows=WindowsSessionUser("", ""),
),
job_attachments=JobAttachmentSettings(
bucket_name=self.bucket_name, root_prefix=self.bucket_root_prefix
),
)
self.queue_with_no_settings = Queue.create(
client=self.deadline_client,
display_name="job_attachments_test_no_settings_queue",
farm=Farm(self.farm_id),
job_run_as_user=JobRunAsUser(
posix=PosixSessionUser("", ""),
runAs="WORKER_AGENT_USER",
windows=WindowsSessionUser("", ""),
),
)
except (ClientError, WaiterError):
# If anything goes wrong, rollback
self.cleanup_resources()
raise
def empty_bucket_under_root_prefix(self):
"""
Empty the bucket between session runs
"""
try:
# List up all objects and their versions in the bucket
version_list = self.s3_client.list_object_versions(
Bucket=self.bucket_name, Prefix=self.bucket_root_prefix
)
object_list = version_list.get("Versions", []) + version_list.get("DeleteMarkers", [])
# Delete all objects and versions
for obj in object_list:
self.s3_client.delete_object(
Bucket=self.bucket_name, Key=obj["Key"], VersionId=obj.get("VersionId", None)
)
except ClientError as e:
if e.response["Error"]["Message"] != "The specified bucket does not exist":
raise
def cleanup_resources(self):
"""
Cleanup all of the resources that the test used
"""
self.empty_bucket_under_root_prefix()
if self.queue:
self.queue.delete(client=self.deadline_client)
if self.queue_with_no_settings:
self.queue_with_no_settings.delete(client=self.deadline_client)