From 61cafd0cc7cba986141632d114251a56ed9f07aa Mon Sep 17 00:00:00 2001 From: "andrei.pradan" Date: Wed, 13 Nov 2024 00:45:12 +0200 Subject: [PATCH] BE: quiz refactor --- .../commands/run_quiz_bot_polling.py | 195 ++++++++++-------- src/mainframe/clients/bot.py | 20 +- 2 files changed, 121 insertions(+), 94 deletions(-) diff --git a/src/mainframe/bots/management/commands/run_quiz_bot_polling.py b/src/mainframe/bots/management/commands/run_quiz_bot_polling.py index 1665f3ef..4c0ca280 100644 --- a/src/mainframe/bots/management/commands/run_quiz_bot_polling.py +++ b/src/mainframe/bots/management/commands/run_quiz_bot_polling.py @@ -31,6 +31,10 @@ def get_reply_markup(options, ci, qi): InlineKeyboardButton(o, callback_data=f"{ci} {qi} {o}") for o in options[2:] ], + [ + InlineKeyboardButton("♻️", callback_data="restart"), + InlineKeyboardButton("✅", callback_data="end"), + ], ] ) @@ -51,18 +55,21 @@ def ask_question(self, update, quiz): else: qi += 1 - if len(questions) - 1 < ci + 1: - update.effective_chat.send_message("Done!") + if ci + 1 > len(questions) or ( + ci + 1 == len(questions) and qi + 1 > len(questions[ci]["questions"]) + ): + scores = " | ".join(f"{p}: {s}" for p, s in quiz["players"].items()) + update.effective_chat.send_message( + f"Done!\nResults: {scores}\n" f"Hit /restart for a new one" + ) return category = questions[ci]["category"] question = questions[ci]["questions"][qi]["q"] options = questions[ci]["questions"][qi]["options"] - current_question = (ci + 1) * (qi + 1) - total_questions = 6 * len(questions) update.effective_chat.send_message( - f"❓{category} - " - f"[{current_question}/{total_questions}] " + f"❓[{ci + 1}/6] {category} - " + f"[{qi + 1}/6] " f"{question}\n\nAnswers: [0/{len(quiz)}]", reply_markup=get_reply_markup(options, ci, qi), parse_mode=telegram.ParseMode.HTML, @@ -70,124 +77,148 @@ def ask_question(self, update, quiz): quiz["current"] = {"ci": ci, "qi": qi, "answers": {}} self.redis.set("quiz", quiz) + def handle_callback_query(self, update: Update, *_, **__): + query = update.callback_query + query.answer() + + if query.data == "end": + return query.edit_message_text("See you next time!") + + if query.data == "restart": + return self.handle_restart(update, *_, **__) + + category, question, *answer = query.data.split() + user = update.effective_user.full_name + quiz = self.redis.get("quiz") + if (answers := quiz["current"]["answers"]) and user in answers: + self.logger("User %s already answered", user) + return + + answers[user] = " ".join(answer) + questions = quiz.get("questions") + options = questions[int(category)]["questions"][int(question)]["options"] + players = quiz["players"] + category_text = query.message.text.split("-")[0] + text = query.message.text.replace(category_text, f"{category_text}") + answers_count = f"Answers: [{len(answers)}/{len(players)}]" + category_and_question = text.split("\n\n")[0] + try: + query.edit_message_text( + category_and_question + "\n\n" + answers_count, + reply_markup=get_reply_markup(options, category, question), + parse_mode=telegram.ParseMode.HTML, + ) + except telegram.error.BadRequest as e: + self.logger.exception(e) + return query.edit_message_text( + f"{text}\nOld message - not acting", + parse_mode=telegram.ParseMode.HTML, + reply_markup=get_reply_markup(options, category, question), + ) + + if set(answers) == set(players): + current_questions = questions[int(category)]["questions"] + results = " | ".join(f"{p}: {a}" for p, a in answers.items()) + correct_answer = current_questions[int(question)]["a"] + for player, answer in answers.items(): + if answer == correct_answer: + players[player] += 1 + answer = f"Correct: {correct_answer}" + query.edit_message_text( + f"{category_and_question}\n{answer}\n{results}", + parse_mode=telegram.ParseMode.HTML, + ) + self.ask_question(update, quiz) + def handle_categories(self, update: Update, *_, **__): + message = update.effective_message quiz = self.redis.get("quiz") - if categories := update.message.text.lstrip("/categories "): # noqa: B005 + if categories := message.text.lstrip("/categories "): # noqa: B005 quiz["categories"] = categories return self.redis.set("quiz", quiz) if not (categories := quiz.get("categories")): return self.reply( - update, + message, "Categoriile nu sunt definite.\n" "Se pot seta cu: /categories categorie1, categorie2", ) - self.reply(update, f"Categoriile sunt: {', '.join(categories)}") + self.reply(message, f"Categoriile sunt: {', '.join(categories)}") def handle_new(self, update: Update, *_, **__): - self.reset(update) + self.reset(update.effective_message) def handle_ready(self, update: Update, *_, **__): new_player = update.effective_user.full_name quiz = self.redis.get("quiz") or {} players = quiz.get("players", {}) if new_player in players: - return self.reply(update, "You're already registered") + return self.reply(update.effective_message, "You're already registered") players[new_player] = 0 self.redis.set("quiz", quiz) self.reply( - update, + update.effective_message, f"{new_player} joined\nCurrent players: {', '.join(list(players))}", ) + def handle_regenerate_questions(self, update: Update, *_, **__): + quiz = self.redis.get("quiz") + self.regenerate_questions(update.effective_message, quiz) + + def handle_restart(self, update: Update, *_, **__): + quiz = self.redis.get("quiz") + quiz["current"] = {} + for player, _ in quiz["players"].items(): + quiz["players"][player] = 0 + + self.redis.set("quiz", quiz) + text = ( + "Scores cleared, hit /start for a new quiz\n" + "Hit /regenerate for new questions" + ) + try: + update.effective_message.edit_text(text) + except telegram.error.BadRequest: + update.effective_message.bot.send_message(update.effective_chat.id, text) + def handle_start(self, update: Update, *_, **__): + message = update.effective_message if not (quiz := self.redis.get("quiz")): - quiz = self.reset(update) + quiz = self.reset(message) if not quiz.get("players"): - return self.reply(update, "No players registered. Register doing /ready ") + return self.reply(message, "No players registered. Register doing /ready ") - self.reply(update, "Starting in 3...") + self.reply(message, "Starting in 3...") time.sleep(1) - self.reply(update, "Starting in 2...") + self.reply(message, "Starting in 2...") time.sleep(1) - self.reply(update, "Starting in 1...") + self.reply(message, "Starting in 1...") time.sleep(1) self.ask_question(update, quiz) def handle_status(self, update: Update, *_, **__): if not (quiz := self.redis.get("quiz")): - return self.reply(update, "No players registered. Register doing /ready ") - - status = "\n".join(f"{player}: {score}" for player, score in quiz.items()) - self.reply(update, f"Current status\n{status}") - - def handle_callback_query(self, update: Update, *_, bot, **__): - query = update.callback_query - query.answer() - - category, question, *answer = query.data.split() - user = update.effective_user.full_name - quiz = self.redis.get("quiz") - if user in (answers := quiz["current"].get("answers")): - self.logger("User %s already answered", user) - return - - answers[user] = " ".join(answer) - questions = quiz.get("questions") - options = questions[int(category)]["questions"][int(question)]["options"] - players = quiz["players"] - category_text = query.message.text.split("-")[0] - text = query.message.text.replace(category_text, f"{category_text}") - text = ( - text.split("\n\n")[0] + "\n\n" + f"Answers: [{len(answers)}/{len(players)}]" - ) - query.edit_message_text( - text, - reply_markup=get_reply_markup(options, category, question), - parse_mode=telegram.ParseMode.HTML, - ) - - if set(answers) == set(players): - current_questions = questions[int(category)]["questions"] - results = " | ".join(f"{p}: {a}" for p, a in answers.items()) - correct_answer = current_questions[int(question)]["a"] - for player, answer in answers.items(): - if answer == correct_answer: - players[player] += 1 - scores = " | ".join(f"{p}: {s}" for p, s in players.items()) - - answer = f"Correct answer: {correct_answer}" - bot.send_message( - update.effective_chat.id, - f"{answer}\nAnswers: {results}\nScores: {scores}", + return self.reply( + update.effective_message, + "No players registered. Register doing /ready ", ) - self.ask_question(update, quiz) - def handle_regenerate_questions(self, update: Update, *_, **__): - quiz = self.redis.get("quiz") - self.regenerate_questions(update, quiz) - - def handle_restart(self, update: Update, *_, **__): - quiz = self.redis.get("quiz") - quiz["current"] = {} - for player, _ in quiz["players"].items(): - quiz["players"][player] = 0 - - self.redis.set("quiz", quiz) - self.reply(update, "Restarted") + status = "\n".join(f"{player}: {score}" for player, score in quiz.items()) + self.reply(update.effective_message, f"Current status\n{status}") - def regenerate_questions(self, update: Update, quiz): + def regenerate_questions(self, message: telegram.Message, quiz): if not (categories := quiz.get("categories")): categories = DEFAULT_CATEGORIES - self.reply(update, "Generating questions...") + self.reply(message, "Generating questions...") try: response = generate_content( prompt=f""" - Genereaza un quiz standard cu urmatoarele categorii: + Genereaza un quiz standard cu intrebari noi pentru + urmatoarele categorii: {', '.join(categories)} Fiecare categorie continand cate 6 intrebari. Pentru fiecare intrebare vreau cate 4 variante de raspuns @@ -226,23 +257,23 @@ def regenerate_questions(self, update: Update, quiz): ) except GeminiError as e: self.logger.exception(e) - return self.reply(update, "Got an error trying to process your message") + return self.reply(message, "Got an error trying to process your message") try: quiz["questions"] = json.loads(response)["questions"] except (json.JSONDecodeError, IndexError) as e: self.logger.exception(e) - return self.reply(update, "Eroare la generarea intrebarilor") + return self.reply(message, "Eroare la generarea intrebarilor") self.redis.set("quiz", quiz) return quiz - def reset(self, update): + def reset(self, message: telegram.Message): quiz = {"categories": DEFAULT_CATEGORIES, "current": {}, "players": {}} - quiz = self.regenerate_questions(update, quiz) + quiz = self.regenerate_questions(message, quiz) self.redis.set(key="quiz", value=quiz) self.reply( - update, + message, "S-a creat un quiz nou\n" "Sa te alaturi: /ready\n" "Categoriile: /categories\n" diff --git a/src/mainframe/clients/bot.py b/src/mainframe/clients/bot.py index e538d136..70612eef 100644 --- a/src/mainframe/clients/bot.py +++ b/src/mainframe/clients/bot.py @@ -3,7 +3,6 @@ from mainframe.bots.models import Bot from mainframe.clients.logs import get_default_logger from mainframe.clients.storage import RedisClient -from telegram import Update def is_whitelisted(func): @@ -20,7 +19,7 @@ def wrapper(self, update, context, *args, **kwargs): return bot.last_called_on = timezone.now() bot.save() - return func(self, update, *args, bot=bot, context=context, **kwargs) + return func(self, update, *args, context=context, **kwargs) return wrapper @@ -45,12 +44,9 @@ def __init__(self, logger=None): self.logger = logger or get_default_logger(__name__) self.redis = RedisClient(self.logger) - def reply(self, update: Update, text: str, **kwargs): - if not update.message: - self.logger.error( - "Can't reply - no message to reply to: '%s'", update.to_dict() - ) - return + def reply(self, message: telegram.Message, text: str, **kwargs): + if not message: + return self.logger.error("Can't reply - message is empty") default_kwargs = { "disable_notification": True, @@ -59,7 +55,7 @@ def reply(self, update: Update, text: str, **kwargs): **kwargs, } try: - update.message.reply_text(text, **default_kwargs) + message.reply_text(text, **default_kwargs) except telegram.error.TelegramError as e: if "can't find end of the entity" in str(e): location = int(e.message.split()[-1]) @@ -67,11 +63,11 @@ def reply(self, update: Update, text: str, **kwargs): "Error parsing markdown - skipping '%s'", text[location] ) return self.reply( - update, f"{text[:location]}\\{text[location]}{text[location + 1:]}" + message, f"{text[:location]}\\{text[location]}{text[location + 1:]}" ) self.logger.warning("Couldn't send markdown '%s'. (%s)", text, e) try: - update.message.reply_text(text) + message.reply_text(text) except telegram.error.TelegramError as e: self.logger.exception("Error sending unformatted message. (%s)", e) - update.message.reply_text("Got an error trying to send response") + message.reply_text("Got an error trying to send response")