-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
91 lines (80 loc) · 4.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import time
from bson.objectid import ObjectId
from elasticsearch.helpers import BulkIndexError
from pika.exceptions import AMQPConnectionError
from retry import retry
from config import get_config_by_name
from logger.custom_logging import log, log_error
from services.mongo_service import update_on_search_dump_status, update_on_search_dump_language_status
from transformers.full_catalog import transform_full_on_search_payload_into_default_lang_items
from transformers.incr_catalog import transform_incr_on_search_payload_into_final_items
from transformers.second import get_unique_locations_from_items
from transformers.translation import translate_items_into_target_language
from utils.elasticsearch_utils import add_documents_to_index, init_elastic_search
from utils.json_utils import clean_nones
from utils.mongo_utils import get_mongo_collection, collection_find_one, init_mongo_database
from utils.rabbitmq_utils import create_channel, declare_queue, consume_message, open_connection
from utils.redis_utils import init_redis_cache
def consume_fn(message_string):
doc_id = None
try:
time.sleep(2)
payload = json.loads(message_string)
log(f"Got the payload {payload}!")
doc_id = ObjectId(payload["doc_id"])
collection = get_mongo_collection('on_search_dump')
on_search_payload = collection_find_one(collection, {"_id": doc_id}, keep_created_at=True)
if on_search_payload:
on_search_payload = clean_nones(on_search_payload)
on_search_payload.pop("id", None)
if payload["request_type"] == "full":
update_on_search_dump_status(doc_id, "IN-PROGRESS", None)
items, offers, locations = transform_full_on_search_payload_into_default_lang_items(on_search_payload)
add_documents_to_index("items", items)
add_documents_to_index("offers", offers)
add_documents_to_index("locations", locations)
update_on_search_dump_status(doc_id, "FINISHED")
for lang in get_config_by_name("LANGUAGE_LIST"):
if lang:
try:
translate_items_into_target_language(items, lang)
locations = get_unique_locations_from_items(items)
add_documents_to_index("items", items)
add_documents_to_index("locations", locations)
update_on_search_dump_language_status(doc_id, lang, "FINISHED")
except BulkIndexError as e:
log_error(f"Got error while adding in elasticsearch for {lang}!")
update_on_search_dump_language_status(doc_id, lang, "FAILED",
e.errors[0]['index']['error']['reason'])
except Exception as e:
log_error(f"Something went wrong with consume function - {e}!")
update_on_search_dump_language_status(doc_id, lang, "FAILED", str(e)) if doc_id else None
elif payload["request_type"] == "inc":
update_on_search_dump_status(doc_id, "IN-PROGRESS")
items, offers = transform_incr_on_search_payload_into_final_items(on_search_payload)
add_documents_to_index("items", items)
add_documents_to_index("offers", offers)
update_on_search_dump_status(doc_id, "FINISHED")
else:
log_error(f"On search payload was not found for {doc_id}!")
update_on_search_dump_status(doc_id, "FAILED", f"On search payload was not found for {doc_id}!")
except BulkIndexError as e:
log_error(f"Got error while adding in elasticsearch!")
update_on_search_dump_status(doc_id, "FAILED", e.errors[0]['index']['error']['reason'])
except Exception as e:
log_error(f"Something went wrong with consume function - {e}!")
update_on_search_dump_status(doc_id, "FAILED", str(e)) if doc_id else None
@retry(AMQPConnectionError, delay=5, jitter=(1, 3))
def run_consumer():
init_mongo_database()
init_elastic_search()
init_redis_cache()
queue_name = get_config_by_name('ELASTIC_SEARCH_QUEUE_NAME')
connection = open_connection()
channel = create_channel(connection)
declare_queue(channel, queue_name)
consume_message(connection, channel, queue_name=queue_name,
consume_fn=consume_fn)
if __name__ == "__main__":
run_consumer()