-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSQSManagement.py
73 lines (63 loc) · 2.44 KB
/
SQSManagement.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from asyncio import constants
import boto3
import logging
import constants
import json
import os
from botocore.exceptions import ClientError
REQUEST_QUEUE_NAME = constants.AWS_SQS_REQUEST_QUEUE_NAME
RESPONSE_QUEUE_NAME = constants.AWS_SQS_RESPONSE_QUEUE_NAME
client = boto3.client('sqs', region_name=constants.REGION_NAME, aws_access_key_id=constants.AWS_ACCESS_KEY_ID,
aws_secret_access_key=constants.AWS_ACCESS_KEY_SECRET)
def create_SQS_queue(SQS_QUEUE_NAME=REQUEST_QUEUE_NAME):
try:
queue = client.create_queue(
QueueName=SQS_QUEUE_NAME,
Attributes={
'DelaySeconds': '15',
'MaximumMessageSize': '262144',
'VisibilityTimeout': '60',
'MessageRetentionPeriod': '86400'
}
)
return queue['QueueUrl']
except:
logging.error(
"Unable to create a SQS queue with the given name, recheck the queue name")
def get_queue_url(queue_name=REQUEST_QUEUE_NAME):
return client.get_queue_url(QueueName=queue_name)['QueueUrl']
def send_message(queueUrl, msg):
response = client.send_message(
QueueUrl=queueUrl,
MessageBody=msg
)
logging.debug(response.get('MessageId'))
def receive_message(queueUrl):
response = client.receive_message(
QueueUrl=queueUrl,
MaxNumberOfMessages=10,
WaitTimeSeconds=10,
VisibilityTimeout=123,
)
return response.get('Messages', [])
# print(f"Number of messages received: {len(response.get('Messages', []))}")
# for message in response.get("Messages", []):
# message_body = message["Body"]
# print(f"Message body: {json.loads(message_body)}")
# print(f"Receipt Handle: {message['ReceiptHandle']}")
def delete_message(QueueUrl, receipt_handle):
response = client.delete_message(
QueueUrl=QueueUrl,
ReceiptHandle=receipt_handle,
)
print(response)
def numberOfMessagesInQueue(queue_name=REQUEST_QUEUE_NAME):
response = client.get_queue_attributes(
QueueUrl=get_queue_url(queue_name),
AttributeNames=['ApproximateNumberOfMessages']
)
number = -1
if response['Attributes'] and response['Attributes']['ApproximateNumberOfMessages']:
number = response['Attributes']['ApproximateNumberOfMessages']
logging.debug("numberOfMessagesInQueue %s %s", queue_name, int(number))
return int(number)