-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnotify.py
133 lines (104 loc) · 5.76 KB
/
notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
'''
@Author Shon Paz
@Date 16/02/2020
'''
import boto3
import json
import botocore
import argparse
'''This class configures bucket notifications for both kafka and rabbitmq endpoints for real-time message queuing'''
class Notifier:
def __init__(self):
# creates all needed arguments for the program to run
parser = argparse.ArgumentParser()
parser.add_argument('-e', '--endpoint-url', help="endpoint url for s3 object storage", required=True)
parser.add_argument('-a', '--access-key', help='access key for s3 object storage', required=True)
parser.add_argument('-s', '--secret-key', help='secret key for s3 object storage', required=True)
parser.add_argument('-b', '--bucket-name', help='s3 bucket name', required=True)
parser.add_argument('-ke', '--kafka-endpoint', help='kafka endpoint in which rgw will send notifications to', required=False)
parser.add_argument('-ae', '--amqp-endpoint', help='amqp endpoint in which rgw will send notifications to', required=False)
parser.add_argument('-he', '--http-endpoint', help='http endpoint in which rgw will send notifications to', required=False)
parser.add_argument('-t', '--topic', help='topic name in which rgw will send notifications to', required=True)
parser.add_argument('-f', '--filter', help='filter such as prefix, suffix, metadata or tags', required=False)
parser.add_argument('-o', '--opaque', help='opaque data that will be sent in the notifications', required=False)
parser.add_argument('-x', '--exchange', help='amqp exchange name (mandatory for amqp endpoints)', required=False)
parser.add_argument('-n', '--notification', help='notification name, allows for setting multiple notifications on the same bucket', required=False, default="configuration")
# parsing all arguments
args = parser.parse_args()
# building instance vars
self.endpoint_url = args.endpoint_url
self.access_key = args.access_key
self.secret_key = args.secret_key
self.bucket_name = args.bucket_name
self.kafka_endpoint = args.kafka_endpoint
self.http_endpoint = args.http_endpoint
self.amqp_endpoint = args.amqp_endpoint
self.topic = args.topic
self.filter = args.filter
self.opaque = args.opaque
self.exchange = args.exchange
self.notification = args.notification
self.sns = boto3.client('sns',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
region_name='default',
aws_secret_access_key=self.secret_key,
config=botocore.client.Config(signature_version = 's3'))
self.s3 = boto3.client('s3',
endpoint_url = self.endpoint_url,
aws_access_key_id = self.access_key,
aws_secret_access_key = self.secret_key,
region_name = 'default',
config=botocore.client.Config(signature_version = 's3'))
''' This function creates and sns-like topic with configured push endpoint'''
def create_sns_topic(self):
attributes = {}
if self.opaque:
attributes['OpaqueData'] = self.opaque
# in case wanted MQ endpoint is kafka
if(self.kafka_endpoint):
attributes['push-endpoint'] = 'kafka://' + self.kafka_endpoint
attributes['kafka-ack-level'] = 'broker'
# in case wanted MQ endpoint is rabbitmq
elif(self.amqp_endpoint):
attributes['push-endpoint'] = 'amqp://' + self.amqp_endpoint
attributes['amqp-exchange'] = self.exchange_name
attributes['amqp-ack-level'] = 'broker'
# in case wanted MQ endpoint is http
elif(self.http_endpoint):
attributes['push-endpoint'] = 'http://' + self.http_endpoint
# in case wanted MQ endpoint is not provided by the user
else:
raise Exception("please configure a push endpoint!")
# creates the wanted sns-like topic on RGW and gets the topic's ARN
self.topic_arn = self.sns.create_topic(Name=self.topic, Attributes=attributes)['TopicArn']
''' This function configures bucket notification for object creation and removal '''
def configure_bucket_notification(self):
# creates a bucket if it doesn't exists
try:
self.s3.head_bucket(Bucket=self.bucket_name)
except botocore.exceptions.ClientError:
self.s3.create_bucket(Bucket = self.bucket_name)
# initial dictionary
bucket_notifications_configuration = {
"TopicConfigurations": [
{
"Id": self.notification,
"TopicArn": self.topic_arn,
"Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
}
]
}
# in case the user has provided a filter to use
if(self.filter):
bucket_notifications_configuration['TopicConfigurations'][0].update({'Filter': json.loads(self.filter)})
# pushed the notification configuration to the bucket
self.s3.put_bucket_notification_configuration(Bucket = self.bucket_name,
NotificationConfiguration=bucket_notifications_configuration)
if __name__ == '__main__':
# creates an notifier instance from class
notifier = Notifier()
# create sns-like topic sent to MQ endpoint
notifier.create_sns_topic()
# configures object creation and removal based notification for the bucket
notifier.configure_bucket_notification()