Skip to content

Commit

Permalink
BE: quiz refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andreipradan committed Nov 12, 2024
1 parent feebf03 commit 61cafd0
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 94 deletions.
195 changes: 113 additions & 82 deletions src/mainframe/bots/management/commands/run_quiz_bot_polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def get_reply_markup(options, ci, qi):
InlineKeyboardButton(o, callback_data=f"{ci} {qi} {o}")
for o in options[2:]
],
[
InlineKeyboardButton("♻️", callback_data="restart"),
InlineKeyboardButton("✅", callback_data="end"),
],
]
)

Expand All @@ -51,143 +55,170 @@ def ask_question(self, update, quiz):
else:
qi += 1

if len(questions) - 1 < ci + 1:
update.effective_chat.send_message("Done!")
if ci + 1 > len(questions) or (
ci + 1 == len(questions) and qi + 1 > len(questions[ci]["questions"])
):
scores = " | ".join(f"{p}: {s}" for p, s in quiz["players"].items())
update.effective_chat.send_message(
f"Done!\nResults: {scores}\n" f"Hit /restart for a new one"
)
return

category = questions[ci]["category"]
question = questions[ci]["questions"][qi]["q"]
options = questions[ci]["questions"][qi]["options"]
current_question = (ci + 1) * (qi + 1)
total_questions = 6 * len(questions)
update.effective_chat.send_message(
f"❓<b>{category}</b> - "
f"[{current_question}/{total_questions}] "
f"❓[{ci + 1}/6] <b>{category}</b> - "
f"[{qi + 1}/6] "
f"{question}\n\nAnswers: [0/{len(quiz)}]",
reply_markup=get_reply_markup(options, ci, qi),
parse_mode=telegram.ParseMode.HTML,
)
quiz["current"] = {"ci": ci, "qi": qi, "answers": {}}
self.redis.set("quiz", quiz)

def handle_callback_query(self, update: Update, *_, **__):
query = update.callback_query
query.answer()

if query.data == "end":
return query.edit_message_text("See you next time!")

if query.data == "restart":
return self.handle_restart(update, *_, **__)

category, question, *answer = query.data.split()
user = update.effective_user.full_name
quiz = self.redis.get("quiz")
if (answers := quiz["current"]["answers"]) and user in answers:
self.logger("User %s already answered", user)
return

answers[user] = " ".join(answer)
questions = quiz.get("questions")
options = questions[int(category)]["questions"][int(question)]["options"]
players = quiz["players"]
category_text = query.message.text.split("-")[0]
text = query.message.text.replace(category_text, f"<b>{category_text}</b>")
answers_count = f"Answers: [{len(answers)}/{len(players)}]"
category_and_question = text.split("\n\n")[0]
try:
query.edit_message_text(
category_and_question + "\n\n" + answers_count,
reply_markup=get_reply_markup(options, category, question),
parse_mode=telegram.ParseMode.HTML,
)
except telegram.error.BadRequest as e:
self.logger.exception(e)
return query.edit_message_text(
f"{text}\nOld message - not acting",
parse_mode=telegram.ParseMode.HTML,
reply_markup=get_reply_markup(options, category, question),
)

if set(answers) == set(players):
current_questions = questions[int(category)]["questions"]
results = " | ".join(f"{p}: {a}" for p, a in answers.items())
correct_answer = current_questions[int(question)]["a"]
for player, answer in answers.items():
if answer == correct_answer:
players[player] += 1
answer = f"Correct: <b>{correct_answer}</b>"
query.edit_message_text(
f"{category_and_question}\n{answer}\n{results}",
parse_mode=telegram.ParseMode.HTML,
)
self.ask_question(update, quiz)

def handle_categories(self, update: Update, *_, **__):
message = update.effective_message
quiz = self.redis.get("quiz")
if categories := update.message.text.lstrip("/categories "): # noqa: B005
if categories := message.text.lstrip("/categories "): # noqa: B005
quiz["categories"] = categories
return self.redis.set("quiz", quiz)

if not (categories := quiz.get("categories")):
return self.reply(
update,
message,
"Categoriile nu sunt definite.\n"
"Se pot seta cu: /categories categorie1, categorie2",
)
self.reply(update, f"Categoriile sunt: {', '.join(categories)}")
self.reply(message, f"Categoriile sunt: {', '.join(categories)}")

def handle_new(self, update: Update, *_, **__):
self.reset(update)
self.reset(update.effective_message)

def handle_ready(self, update: Update, *_, **__):
new_player = update.effective_user.full_name
quiz = self.redis.get("quiz") or {}
players = quiz.get("players", {})
if new_player in players:
return self.reply(update, "You're already registered")
return self.reply(update.effective_message, "You're already registered")
players[new_player] = 0
self.redis.set("quiz", quiz)
self.reply(
update,
update.effective_message,
f"{new_player} joined\nCurrent players: {', '.join(list(players))}",
)

def handle_regenerate_questions(self, update: Update, *_, **__):
quiz = self.redis.get("quiz")
self.regenerate_questions(update.effective_message, quiz)

def handle_restart(self, update: Update, *_, **__):
quiz = self.redis.get("quiz")
quiz["current"] = {}
for player, _ in quiz["players"].items():
quiz["players"][player] = 0

self.redis.set("quiz", quiz)
text = (
"Scores cleared, hit /start for a new quiz\n"
"Hit /regenerate for new questions"
)
try:
update.effective_message.edit_text(text)
except telegram.error.BadRequest:
update.effective_message.bot.send_message(update.effective_chat.id, text)

def handle_start(self, update: Update, *_, **__):
message = update.effective_message
if not (quiz := self.redis.get("quiz")):
quiz = self.reset(update)
quiz = self.reset(message)

if not quiz.get("players"):
return self.reply(update, "No players registered. Register doing /ready ")
return self.reply(message, "No players registered. Register doing /ready ")

self.reply(update, "Starting in 3...")
self.reply(message, "Starting in 3...")
time.sleep(1)
self.reply(update, "Starting in 2...")
self.reply(message, "Starting in 2...")
time.sleep(1)
self.reply(update, "Starting in 1...")
self.reply(message, "Starting in 1...")
time.sleep(1)

self.ask_question(update, quiz)

def handle_status(self, update: Update, *_, **__):
if not (quiz := self.redis.get("quiz")):
return self.reply(update, "No players registered. Register doing /ready ")

status = "\n".join(f"{player}: {score}" for player, score in quiz.items())
self.reply(update, f"<b>Current status</b>\n{status}")

def handle_callback_query(self, update: Update, *_, bot, **__):
query = update.callback_query
query.answer()

category, question, *answer = query.data.split()
user = update.effective_user.full_name
quiz = self.redis.get("quiz")
if user in (answers := quiz["current"].get("answers")):
self.logger("User %s already answered", user)
return

answers[user] = " ".join(answer)
questions = quiz.get("questions")
options = questions[int(category)]["questions"][int(question)]["options"]
players = quiz["players"]
category_text = query.message.text.split("-")[0]
text = query.message.text.replace(category_text, f"<b>{category_text}</b>")
text = (
text.split("\n\n")[0] + "\n\n" + f"Answers: [{len(answers)}/{len(players)}]"
)
query.edit_message_text(
text,
reply_markup=get_reply_markup(options, category, question),
parse_mode=telegram.ParseMode.HTML,
)

if set(answers) == set(players):
current_questions = questions[int(category)]["questions"]
results = " | ".join(f"{p}: {a}" for p, a in answers.items())
correct_answer = current_questions[int(question)]["a"]
for player, answer in answers.items():
if answer == correct_answer:
players[player] += 1
scores = " | ".join(f"{p}: {s}" for p, s in players.items())

answer = f"Correct answer: <b>{correct_answer}</b>"
bot.send_message(
update.effective_chat.id,
f"{answer}\nAnswers: {results}\nScores: {scores}",
return self.reply(
update.effective_message,
"No players registered. Register doing /ready ",
)
self.ask_question(update, quiz)

def handle_regenerate_questions(self, update: Update, *_, **__):
quiz = self.redis.get("quiz")
self.regenerate_questions(update, quiz)

def handle_restart(self, update: Update, *_, **__):
quiz = self.redis.get("quiz")
quiz["current"] = {}
for player, _ in quiz["players"].items():
quiz["players"][player] = 0

self.redis.set("quiz", quiz)
self.reply(update, "Restarted")
status = "\n".join(f"{player}: {score}" for player, score in quiz.items())
self.reply(update.effective_message, f"<b>Current status</b>\n{status}")

def regenerate_questions(self, update: Update, quiz):
def regenerate_questions(self, message: telegram.Message, quiz):
if not (categories := quiz.get("categories")):
categories = DEFAULT_CATEGORIES

self.reply(update, "Generating questions...")
self.reply(message, "Generating questions...")

try:
response = generate_content(
prompt=f"""
Genereaza un quiz standard cu urmatoarele categorii:
Genereaza un quiz standard cu intrebari noi pentru
urmatoarele categorii:
{', '.join(categories)}
Fiecare categorie continand cate 6 intrebari.
Pentru fiecare intrebare vreau cate 4 variante de raspuns
Expand Down Expand Up @@ -226,23 +257,23 @@ def regenerate_questions(self, update: Update, quiz):
)
except GeminiError as e:
self.logger.exception(e)
return self.reply(update, "Got an error trying to process your message")
return self.reply(message, "Got an error trying to process your message")

try:
quiz["questions"] = json.loads(response)["questions"]
except (json.JSONDecodeError, IndexError) as e:
self.logger.exception(e)
return self.reply(update, "Eroare la generarea intrebarilor")
return self.reply(message, "Eroare la generarea intrebarilor")

self.redis.set("quiz", quiz)
return quiz

def reset(self, update):
def reset(self, message: telegram.Message):
quiz = {"categories": DEFAULT_CATEGORIES, "current": {}, "players": {}}
quiz = self.regenerate_questions(update, quiz)
quiz = self.regenerate_questions(message, quiz)
self.redis.set(key="quiz", value=quiz)
self.reply(
update,
message,
"S-a creat un quiz nou\n"
"Sa te alaturi: /ready\n"
"Categoriile: /categories\n"
Expand Down
20 changes: 8 additions & 12 deletions src/mainframe/clients/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from mainframe.bots.models import Bot
from mainframe.clients.logs import get_default_logger
from mainframe.clients.storage import RedisClient
from telegram import Update


def is_whitelisted(func):
Expand All @@ -20,7 +19,7 @@ def wrapper(self, update, context, *args, **kwargs):
return
bot.last_called_on = timezone.now()
bot.save()
return func(self, update, *args, bot=bot, context=context, **kwargs)
return func(self, update, *args, context=context, **kwargs)

return wrapper

Expand All @@ -45,12 +44,9 @@ def __init__(self, logger=None):
self.logger = logger or get_default_logger(__name__)
self.redis = RedisClient(self.logger)

def reply(self, update: Update, text: str, **kwargs):
if not update.message:
self.logger.error(
"Can't reply - no message to reply to: '%s'", update.to_dict()
)
return
def reply(self, message: telegram.Message, text: str, **kwargs):
if not message:
return self.logger.error("Can't reply - message is empty")

default_kwargs = {
"disable_notification": True,
Expand All @@ -59,19 +55,19 @@ def reply(self, update: Update, text: str, **kwargs):
**kwargs,
}
try:
update.message.reply_text(text, **default_kwargs)
message.reply_text(text, **default_kwargs)
except telegram.error.TelegramError as e:
if "can't find end of the entity" in str(e):
location = int(e.message.split()[-1])
self.logger.warning(
"Error parsing markdown - skipping '%s'", text[location]
)
return self.reply(
update, f"{text[:location]}\\{text[location]}{text[location + 1:]}"
message, f"{text[:location]}\\{text[location]}{text[location + 1:]}"
)
self.logger.warning("Couldn't send markdown '%s'. (%s)", text, e)
try:
update.message.reply_text(text)
message.reply_text(text)
except telegram.error.TelegramError as e:
self.logger.exception("Error sending unformatted message. (%s)", e)
update.message.reply_text("Got an error trying to send response")
message.reply_text("Got an error trying to send response")

0 comments on commit 61cafd0

Please sign in to comment.