forked from samirsalman/AmazonOffers-TelegramBot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
109 lines (86 loc) · 3.04 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from sre_parse import CATEGORIES
from typing import Dict, List
import telegram
from amazon_api import search_items
from create_messages import create_item_html
import time
from datetime import datetime
from itertools import chain
import random
from consts import *
import logging
logging.basicConfig(level=logging.INFO)
# ********** Author: Samir Salman **********
# Search keywords definition
"""
A dictionary with {CATEGORY_NAME: [<LIST OF THE CATEGORY KEYWORDS>]}
"""
categories = {
"Electronics": [
"Televisori",
]
}
def is_active() -> bool:
now = datetime.now().time()
return MIN_HOUR < now.hour < MAX_HOUR
def send_consecutive_messages(list_of_struct: List[str]) -> None:
bot.send_message(
chat_id=CHANNEL_NAME,
text=list_of_struct[0],
reply_markup=list_of_struct[1],
parse_mode=telegram.ParseMode.HTML,
)
bot.send_message(
chat_id=CHANNEL_NAME,
text=list_of_struct[2],
reply_markup=list_of_struct[3],
parse_mode=telegram.ParseMode.HTML,
)
return list_of_struct[4:]
# run bot function
def run_bot(bot: telegram.Bot, categories: Dict[str, List[str]]) -> None:
# start loop
while True:
try:
items_full = []
# iterate over keywords
for category in categories:
for keyword in categories[category]:
# iterate over pages
for page in range(1, 10):
items = search_items(keyword, category, item_page=page)
# api time limit for another http request is 1 second
time.sleep(1)
items_full.extend(items)
logging.info(f'{5 * "*"} Requests Completed {5 * "*"}')
# shuffling results times
random.shuffle(items_full)
# creating html message, you can find more information in create_messages.py
res = create_item_html(items_full)
# while we have items in our list
while len(res) > 3:
# if bot is active
if is_active():
try:
# Sending two consecutive messages
logging.info(f'{5 * "*"} Sending posts to channel {5 * "*"}')
res = send_consecutive_messages(res)
except Exception as e:
logging.info(e)
res = res[4:]
continue
# Sleep for PAUSE_MINUTES
time.sleep(60 * PAUSE_MINUTES)
else:
# if bot is not active
logging.info(
f'{5 * "*"} Inactive Bot, between {MIN_HOUR}AM and {MAX_HOUR}PM {5 * "*"}'
)
time.sleep(60 * 5)
except Exception as e:
logging.info(e)
if __name__ == "__main__":
# Create the bot instance
bot = telegram.Bot(token=TOKEN)
# running bot
run_bot(bot=bot, categories=categories)