generated from XpressAI/xai-component-library-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_components.py
261 lines (207 loc) · 7.14 KB
/
aws_components.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
from xai_components.base import InArg, OutArg, Component, xai_component
import boto3
@xai_component
class SQSSendMessage(Component):
"""Component to send a message to an SQS queue.
##### inPorts:
- queue_url (str): The URL of the SQS queue.
- message_body (str): The message body to send.
##### outPorts:
- message_id (str): The ID of the sent message.
"""
queue_url: InArg[str]
message_body: InArg[str]
message_id: OutArg[str]
def execute(self, ctx) -> None:
sqs = boto3.client('sqs')
response = sqs.send_message(
QueueUrl=self.queue_url.value,
MessageBody=self.message_body.value
)
self.message_id.value = response['MessageId']
@xai_component
class SQSReceiveMessage(Component):
"""Component to receive messages from an SQS queue.
##### inPorts:
- queue_url (str): The URL of the SQS queue.
##### outPorts:
- messages (list): A list of received messages.
"""
queue_url: InArg[str]
messages: OutArg[list]
def execute(self, ctx) -> None:
sqs = boto3.client('sqs')
response = sqs.receive_message(
QueueUrl=self.queue_url.value,
MaxNumberOfMessages=10,
WaitTimeSeconds=10
)
self.messages.value = response.get('Messages', [])
@xai_component
class SQSDeleteMessage(Component):
"""Component to delete a message from an SQS queue.
##### inPorts:
- queue_url (str): The URL of the SQS queue.
- receipt_handle (str): The receipt handle of the message to delete.
"""
queue_url: InArg[str]
receipt_handle: InArg[str]
def execute(self, ctx) -> None:
sqs = boto3.client('sqs')
sqs.delete_message(
QueueUrl=self.queue_url.value,
ReceiptHandle=self.receipt_handle.value
)
@xai_component
class S3UploadFile(Component):
"""Component to upload a file to an S3 bucket.
##### inPorts:
- bucket_name (str): The name of the S3 bucket.
- file_path (str): The local path of the file to upload.
- object_name (str): The name to assign to the file in S3.
##### outPorts:
- response (dict): The response from the S3 upload operation.
"""
bucket_name: InArg[str]
file_path: InArg[str]
object_name: InArg[str]
response: OutArg[dict]
def execute(self, ctx) -> None:
s3 = boto3.client('s3')
response = s3.upload_file(
Filename=self.file_path.value,
Bucket=self.bucket_name.value,
Key=self.object_name.value
)
self.response.value = response
@xai_component
class S3DownloadFile(Component):
"""Component to download a file from an S3 bucket.
##### inPorts:
- bucket_name (str): The name of the S3 bucket.
- object_name (str): The name of the file in S3.
- file_path (str): The local path to save the downloaded file.
##### outPorts:
- response (dict): The response from the S3 download operation.
"""
bucket_name: InArg[str]
object_name: InArg[str]
file_path: InArg[str]
response: OutArg[dict]
def execute(self, ctx) -> None:
s3 = boto3.client('s3')
response = s3.download_file(
Bucket=self.bucket_name.value,
Key=self.object_name.value,
Filename=self.file_path.value
)
self.response.value = response
@xai_component
class S3ListObjects(Component):
"""Component to list objects in an S3 bucket.
##### inPorts:
- bucket_name (str): The name of the S3 bucket.
##### outPorts:
- objects (list): A list of objects in the specified S3 bucket.
"""
bucket_name: InArg[str]
objects: OutArg[list]
def execute(self, ctx) -> None:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=self.bucket_name.value)
self.objects.value = response.get('Contents', [])
@xai_component
class DynamoDBPutItem(Component):
"""Component to insert an item into a DynamoDB table.
##### inPorts:
- table_name (str): The name of the table.
- item (dict): The item to insert (e.g., {'id': {'S': '123'}, 'name': {'S': 'John'}}).
##### outPorts:
- response (dict): The response from the PutItem operation.
"""
table_name: InArg[str]
item: InArg[dict]
response: OutArg[dict]
def execute(self, ctx) -> None:
dynamodb = boto3.client('dynamodb')
response = dynamodb.put_item(
TableName=self.table_name.value,
Item=self.item.value
)
self.response.value = response
@xai_component
class DynamoDBGetItem(Component):
"""Component to retrieve an item from a DynamoDB table.
##### inPorts:
- table_name (str): The name of the table.
- key (dict): The key of the item to retrieve (e.g., {'id': {'S': '123'}}).
##### outPorts:
- item (dict): The retrieved item.
"""
table_name: InArg[str]
key: InArg[dict]
item: OutArg[dict]
def execute(self, ctx) -> None:
dynamodb = boto3.client('dynamodb')
response = dynamodb.get_item(
TableName=self.table_name.value,
Key=self.key.value
)
self.item.value = response.get('Item', {})
@xai_component
class DynamoDBDeleteItem(Component):
"""Component to delete an item from a DynamoDB table.
##### inPorts:
- table_name (str): The name of the table.
- key (dict): The key of the item to delete (e.g., {'id': {'S': '123'}}).
##### outPorts:
- response (dict): The response from the DeleteItem operation.
"""
table_name: InArg[str]
key: InArg[dict]
response: OutArg[dict]
def execute(self, ctx) -> None:
dynamodb = boto3.client('dynamodb')
response = dynamodb.delete_item(
TableName=self.table_name.value,
Key=self.key.value
)
self.response.value = response
@xai_component
class GlueStartJobRun(Component):
"""Component to start an AWS Glue job run.
##### inPorts:
- job_name (str): The name of the AWS Glue job.
- arguments (dict): Optional arguments to pass to the job.
##### outPorts:
- job_run_id (str): The ID of the started job run.
"""
job_name: InArg[str]
arguments: InArg[dict]
job_run_id: OutArg[str]
def execute(self, ctx) -> None:
glue = boto3.client('glue')
response = glue.start_job_run(
JobName=self.job_name.value,
Arguments=self.arguments.value or {}
)
self.job_run_id.value = response['JobRunId']
@xai_component
class GetGlueJobStatus(Component):
"""Component to get the status of an AWS Glue job run.
##### inPorts:
- job_name (str): The name of the AWS Glue job.
- job_run_id (str): The ID of the Glue job run.
##### outPorts:
- job_status (str): The current status of the job run.
"""
job_name: InArg[str]
job_run_id: InArg[str]
job_status: OutArg[str]
def execute(self, ctx) -> None:
glue = boto3.client('glue')
response = glue.get_job_run(
JobName=self.job_name.value,
RunId=self.job_run_id.value
)
self.job_status.value = response['JobRun']['JobRunState']