-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloadreviews.py
75 lines (61 loc) · 1.94 KB
/
loadreviews.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""
Dynamo to SQS
"""
import boto3
import json
import sys
import os
import boto3
from io import StringIO
import logging
DYNAMODB = boto3.resource('dynamodb')
TABLE = "reviews"
QUEUE = "reviews_queue"
SQS = boto3.client("sqs")
#SETUP LOGGING
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logHandler.setFormatter(formatter)
LOG.addHandler(logHandler)
def scan_table(table):
"""Scans table and return results"""
LOG.info(f"Scanning Table {table}")
producer_table = DYNAMODB.Table(table)
response = producer_table.scan()
items = response['Items']
LOG.info(f"Found {len(items)} Items")
return items
def send_sqs_msg(msg, queue_name, delay=0):
"""Send SQS Message
Expects an SQS queue_name and msg in a dictionary format.
Returns a response dictionary.
"""
queue_url = SQS.get_queue_url(QueueName=queue_name)["QueueUrl"]
queue_send_log_msg = "Send message to queue url: %s, with body: %s" %\
(queue_url, msg)
LOG.info(queue_send_log_msg)
json_msg = json.dumps(msg)
response = SQS.send_message(
QueueUrl=queue_url,
MessageBody=json_msg,
DelaySeconds=delay)
queue_send_log_msg_resp = "Message Response: %s for queue url: %s" %\
(response, queue_url)
LOG.info(queue_send_log_msg_resp)
return response
def send_emissions(table, queue_name):
"""Send Emissions"""
items = scan_table(table=table)
for item in items:
LOG.info(f"Sending item {item} to queue: {queue_name}")
response = send_sqs_msg(item, queue_name=queue_name)
LOG.debug(response)
def lambda_handler(event, context):
"""
Lambda entrypoint
"""
extra_logging = {"table": TABLE, "queue": QUEUE}
LOG.info(f"event {event}, context {context}", extra=extra_logging)
send_emissions(table=TABLE, queue_name=QUEUE)