diff --git a/README.rst b/README.rst index 88a0148..a2716bd 100644 --- a/README.rst +++ b/README.rst @@ -6,6 +6,8 @@ How it works **This is a first version of library** +**Only long polling mode supported!** + Library starts your python-telegram-bot object with custom url (our unit-test server on Flask running under waitress). Now you can communicate in unit-tests with your bot as you do in Telegram. @@ -15,6 +17,8 @@ Features 1. send text message 2. send command +3. send file +4. receive file Fixtures @@ -53,6 +57,27 @@ Check /start command of Echo Bot message = user.get_message() assert message['text'] == 'Hi [FN LN](tg://user?id=1)\!' + +File Bot example +--------------------------- + +Bot renames file you send to him. + +.. code:: + + def test_echobot_file(bot, user): + current_dir = os.path.dirname(os.path.abspath(__file__)) + + user.send_document(current_dir, 'test.txt') + + document = user.get_document() + + file_io = document.path_or_bytes + assert file_io.name == 'echo_test.txt' + + content = io.TextIOWrapper(file_io, encoding='utf-8').read() + assert content == 'Hello world!\nHello world!' + ========== Installing ========== diff --git a/conftest.py b/conftest.py index 75d8b33..99db3f3 100644 --- a/conftest.py +++ b/conftest.py @@ -2,13 +2,18 @@ import pytest from _pytest.main import Session + @pytest.hookimpl() def pytest_sessionstart(session: Session) -> None: print("Start pytest testing") + # Load fixtures pytest_plugins = [ 'telegram_bot_unittest.fixtures', - 'echobot.fixtures' + 'telegram_bot_unittest.pytest.fixtures' + + #'examples.echobot.fixtures', + #'examples.filebot.fixtures' ] diff --git a/echobot/__init__.py b/examples/__init__.py similarity index 100% rename from echobot/__init__.py rename to examples/__init__.py diff --git a/examples/echobot/__init__.py b/examples/echobot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/echobot/echobot.py b/examples/echobot/echobot.py similarity index 71% rename from echobot/echobot.py rename to examples/echobot/echobot.py index 9a926a7..917c08e 100644 --- a/echobot/echobot.py +++ b/examples/echobot/echobot.py @@ -7,8 +7,6 @@ BOT_TOKEN = os.getenv("BOT_TOKEN") -# Define a few command handlers. These usually take the two arguments update and -# context. def start(update: Update, context: CallbackContext) -> None: """Send a message when the command /start is issued.""" user = update.effective_user @@ -30,20 +28,16 @@ def echo(update: Update, context: CallbackContext) -> None: def setup_bot(bot_token: str, base_url: str = None) -> Updater: """Start the bot.""" - # Create the Updater and pass it your bot's token. + updater = Updater(bot_token, base_url=base_url) - # Get the dispatcher to register handlers dispatcher = updater.dispatcher - # on different commands - answer in Telegram dispatcher.add_handler(CommandHandler("start", start)) dispatcher.add_handler(CommandHandler("help", help_command)) - # on non command i.e message - echo the message on Telegram dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, echo)) - # Start the Bot updater.start_polling() return updater @@ -53,9 +47,6 @@ def main(base_url: str = None) -> None: updater = setup_bot(BOT_TOKEN, base_url) - # Run the bot until you press Ctrl-C or the process receives SIGINT, - # SIGTERM or SIGABRT. This should be used most of the time, since - # start_polling() is non-blocking and will stop the bot gracefully. updater.idle() diff --git a/echobot/fixtures.py b/examples/echobot/fixtures.py similarity index 88% rename from echobot/fixtures.py rename to examples/echobot/fixtures.py index dec7967..298e45f 100644 --- a/echobot/fixtures.py +++ b/examples/echobot/fixtures.py @@ -1,6 +1,8 @@ import pytest -from echobot.echobot import setup_bot + +from examples.echobot.echobot import setup_bot + from telegram_bot_unittest.routes import TELEGRAM_URL from telegram_bot_unittest.user import BOT_TOKEN, CHAT_ID @@ -18,7 +20,7 @@ def bot(telegram_server): user2_id = CHAT_ID+1 u2 = UserBase(user2_id) -chat2 = ChatBase(user2_id) +chat2 = ChatBase(u2) @pytest.fixture(scope='session') diff --git a/echobot/pytest_echobot.py b/examples/echobot/pytest_echobot.py similarity index 55% rename from echobot/pytest_echobot.py rename to examples/echobot/pytest_echobot.py index 3652bfa..dcb36e5 100644 --- a/echobot/pytest_echobot.py +++ b/examples/echobot/pytest_echobot.py @@ -7,7 +7,7 @@ def test_echobot_start(bot, user): message = user.get_message() assert message - assert message['text'] == 'Hi [FN LN](tg://user?id=1)\!' + assert message['text'] == 'Hi [FirstName LastName](tg://user?id=1)\!' def test_echobot_help(bot, user): @@ -20,7 +20,7 @@ def test_echobot_help(bot, user): assert message['text'] == 'Help!' -def test_echobot_message(bot, user, user2): +def test_echobot_message(bot, user): user.send_message('testing message') @@ -29,9 +29,19 @@ def test_echobot_message(bot, user, user2): assert message assert message['text'] == 'testing message' - user2.send_message('testing message') + +def test_echobot_multiple_users(bot, user, user2): + + user.send_message('my name user1') + + message = user.get_message() + + assert message + assert message['text'] == 'my name user1' + + user2.send_message('my name user2') message = user2.get_message() assert message - assert message['text'] == 'testing message' + assert message['text'] == 'my name user2' diff --git a/examples/filebot/__init__.py b/examples/filebot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/filebot/filebot.py b/examples/filebot/filebot.py new file mode 100644 index 0000000..049d09e --- /dev/null +++ b/examples/filebot/filebot.py @@ -0,0 +1,56 @@ + +import os +import io + +from telegram import Update, ForceReply +from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext + + +BOT_TOKEN = os.getenv("BOT_TOKEN") + + +def start(update: Update, context: CallbackContext) -> None: + update.message.reply_text('Hi!') + + +def echo_document(update: Update, context: CallbackContext) -> None: + """Echo the user message.""" + chat_id = update.message.chat_id + user_id = update.message.from_user.id + + attachment = io.BytesIO() + context.bot.get_file(update.message.document).download(out=attachment) + attachment.seek(0) + + content = attachment.read().decode('utf-8') + + f = io.BytesIO(bytes(content, "utf-8")) + f.name = 'echo_' + update.message.document.file_name + f.seek(0) + + context.bot.send_document(chat_id, f) + + +def setup_bot(bot_token: str, base_url: str = None) -> Updater: + """Start the bot.""" + updater = Updater(bot_token, base_url=base_url, base_file_url=base_url) + + dispatcher = updater.dispatcher + + dispatcher.add_handler(CommandHandler("start", start)) + + dispatcher.add_handler(MessageHandler(Filters.attachment, echo_document)) + + updater.start_polling() + + return updater + + +def main(base_url: str = None) -> None: + + updater = setup_bot(BOT_TOKEN, base_url) + updater.idle() + + +if __name__ == '__main__': + main() diff --git a/examples/filebot/fixtures.py b/examples/filebot/fixtures.py new file mode 100644 index 0000000..3c3fd0b --- /dev/null +++ b/examples/filebot/fixtures.py @@ -0,0 +1,12 @@ + +import pytest +from examples.filebot.filebot import setup_bot +from telegram_bot_unittest.routes import TELEGRAM_URL +from telegram_bot_unittest.user import BOT_TOKEN + + +@pytest.fixture(scope='session') +def bot(telegram_server): + updater = setup_bot(BOT_TOKEN, TELEGRAM_URL) + yield updater.bot + updater.stop() diff --git a/examples/filebot/pytest_filebot.py b/examples/filebot/pytest_filebot.py new file mode 100644 index 0000000..b0018af --- /dev/null +++ b/examples/filebot/pytest_filebot.py @@ -0,0 +1,27 @@ +import os +import io + + +def test_echobot_start(bot, user): + + user.send_command('/start') + + message = user.get_message() + + assert message + assert message['text'] == 'Hi!' + + +def test_echobot_file(bot, user): + + current_dir = os.path.dirname(os.path.abspath(__file__)) + + user.send_document(current_dir, 'test.txt') + + document = user.get_document() + + file_io = document.path_or_bytes + assert file_io.name == 'echo_test.txt' + + content = io.TextIOWrapper(file_io, encoding='utf-8').read() + assert content == 'Hello world!\nHello world!' diff --git a/examples/filebot/test.txt b/examples/filebot/test.txt new file mode 100644 index 0000000..2c16ffc --- /dev/null +++ b/examples/filebot/test.txt @@ -0,0 +1,2 @@ +Hello world! +Hello world! \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index fcdd206..3b52824 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,7 @@ [pytest] FAIL_INVALID_TEMPLATE_VARS = True norecursedirs = venv -python_files = pytest_*.py +#python_files = pytest_*.py +python_files = test_*.py addopts = -p no:warnings --strict-markers --log-cli-level=INFO diff --git a/requirements.txt b/requirements.txt index a826a87..4b9979a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ python-telegram-bot pytest -pytest-cov flask waitress + +pytest-cov codecov \ No newline at end of file diff --git a/setup.py b/setup.py index 8a5b114..9baacc1 100644 --- a/setup.py +++ b/setup.py @@ -2,11 +2,12 @@ from setuptools import setup VERSION_MAJOR = 0 -VERSION_MINOR = 2 +VERSION_MINOR = 3 ver = '%d.%d' % (VERSION_MAJOR, VERSION_MINOR) backlog = """ +0.3 - 22.05.24 - send & receive documents 0.2 - 22.04.13 - add multiple users @@ -34,8 +35,11 @@ url='https://github.com/dontsovcmc/telegram_bot_unittest', include_package_data=True, packages=[ - 'echobot', - 'telegram_bot_unittest' + 'examples', + 'examples.echobot', + 'examples.filebot', + 'telegram_bot_unittest', + 'telegram_bot_unittest.pytest' ], classifiers=[ 'Development Status :: 5 - Production/Stable', diff --git a/telegram_bot_unittest/core.py b/telegram_bot_unittest/core.py index 4899825..e14b184 100644 --- a/telegram_bot_unittest/core.py +++ b/telegram_bot_unittest/core.py @@ -2,6 +2,11 @@ from datetime import datetime from queue import Empty, Queue +from telegram.utils.types import JSONDict + +import logging +logger = logging.getLogger(__name__) + # online telegram-bot-sdk # https://telegram-bot-sdk.readme.io/reference/getupdates @@ -14,13 +19,6 @@ def now() -> int: return int((datetime.utcnow() - epoch).total_seconds()) -def result_ok(result: List, ok: bool = True) -> Dict: - return { - "ok": ok, - "result": result - } - - class TelegramCore: def __init__(self): @@ -28,56 +26,59 @@ def __init__(self): self._update_counter = 0 self.income = {} - def init_queue(self, id) -> None: + def init_queue(self, id: int) -> None: + """ + Initialize message queue + :param id: chat_id + :return: + """ + if isinstance(id, str): + raise Exception('chat_id should be int') if id not in self.income: - self.income[id] = Queue() - - def user_send(self, bot_id, user_from, chat, text) -> Dict: - + self.income[id] = Queue(100) + + def send(self, receiver_id: int, sender: JSONDict, chat: JSONDict, **kwargs) -> JSONDict: + """ + Message from User to Bot + :param receiver_id: + :param sender: + :param chat: + :param kwargs: text, document, entities, contact + :return: + """ self._message_counter += 1 message = {'message_id': self._message_counter, - 'from': user_from, + 'from': sender, 'chat': chat, - 'date': now(), - 'text': text + 'date': now() } - self.init_queue(bot_id) - self.income[bot_id].put(message) - return message - - def user_send_command(self, bot_id: int, user_from, chat, command) -> None: - - self._message_counter += 1 - message = {'message_id': self._message_counter, - 'from': user_from, - 'chat': chat, - 'date': now(), - 'text': command, - "entities": [ - { - "offset": 0, - "length": len(command), # TODO ? - "type": "bot_command" - }] - } + for i in kwargs.keys(): + message.update({i: kwargs.get(i)}) - self.init_queue(bot_id) - self.income[bot_id].put(message) - - def bot_send(self, bot_from, chat, text) -> Dict: - - self._message_counter += 1 - message = {'message_id': self._message_counter, - 'from': bot_from, - 'chat': chat, - 'date': now(), - 'text': text} - - self.init_queue(chat['id']) - self.income[chat['id']].put(message) + self.init_queue(receiver_id) + self.income[receiver_id].put(message, block=False) return message + def send_command(self, bot_id: int, sender: JSONDict, chat: JSONDict, command: str) -> JSONDict: + """ + Command message from User to Bot + :param bot_id: + :param sender: + :param user_from: + :param chat: + :param command: + :return: + """ + entities = [ + { + "offset": 0, + "length": len(command), # TODO ? + "type": "bot_command" + } + ] + return self.send(bot_id, sender, chat, text=command, entities=entities) + def get_updates(self, chat_id: int, timeout: float = 2.0) -> List[Dict]: ret = [] @@ -97,5 +98,28 @@ def get_updates(self, chat_id: int, timeout: float = 2.0) -> List[Dict]: return ret + def print_queues(self): + """ + Print messages in queue if exist when turn off. + """ + for chat_id, queue in self.income.items(): + + try: + message = self.income[chat_id].get_nowait() + + out = f'chat_id: {chat_id}\n' + + while message: + try: + out += str(message) + '\n' + message = self.income[chat_id].get_nowait() + except Empty: + break + + if out: + logger.info(out) + except Empty: + pass + core = TelegramCore() \ No newline at end of file diff --git a/telegram_bot_unittest/fixtures.py b/telegram_bot_unittest/fixtures.py index 9b4101b..3ed7a3e 100644 --- a/telegram_bot_unittest/fixtures.py +++ b/telegram_bot_unittest/fixtures.py @@ -1,12 +1,16 @@ import pytest +import logging + from .routes import start_server, shutdown_server from .user import Tester, UserBase, ChatBase from .core import core +logger = logging.getLogger(__name__) + u = UserBase() -chat = ChatBase() +chat = ChatBase(u) @pytest.fixture(scope='session') @@ -18,5 +22,8 @@ def user() -> Tester: @pytest.fixture(scope='session') def telegram_server(): s, t = start_server() + logger.info('telegram server started') yield + core.print_queues() + logger.info('telegram server shutdown begin') shutdown_server(s, t) diff --git a/telegram_bot_unittest/pytest/__init__.py b/telegram_bot_unittest/pytest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/telegram_bot_unittest/pytest/fixtures.py b/telegram_bot_unittest/pytest/fixtures.py new file mode 100644 index 0000000..298e45f --- /dev/null +++ b/telegram_bot_unittest/pytest/fixtures.py @@ -0,0 +1,29 @@ + +import pytest + +from examples.echobot.echobot import setup_bot + +from telegram_bot_unittest.routes import TELEGRAM_URL +from telegram_bot_unittest.user import BOT_TOKEN, CHAT_ID + + +@pytest.fixture(scope='session') +def bot(telegram_server): + updater = setup_bot(BOT_TOKEN, TELEGRAM_URL) + yield updater.bot + updater.stop() + + +from telegram_bot_unittest.user import Tester, UserBase, ChatBase +from telegram_bot_unittest.core import core + +user2_id = CHAT_ID+1 + +u2 = UserBase(user2_id) +chat2 = ChatBase(u2) + + +@pytest.fixture(scope='session') +def user2() -> Tester: + user2 = Tester(core, u2, chat2) + return user2 diff --git a/telegram_bot_unittest/pytest/test_bot.py b/telegram_bot_unittest/pytest/test_bot.py new file mode 100644 index 0000000..56b1453 --- /dev/null +++ b/telegram_bot_unittest/pytest/test_bot.py @@ -0,0 +1,47 @@ + + +def test_bot_start(bot, user): + + user.send_command('/start') + + message = user.get_message() + + assert message + assert message['text'] == 'Hi [FirstName LastName](tg://user?id=1)\!' + + +def test_bot_help(bot, user): + + user.send_command('/help') + + message = user.get_message() + + assert message + assert message['text'] == 'Help!' + + +def test_bot_message(bot, user): + + user.send_message('testing message') + + message = user.get_message() + + assert message + assert message['text'] == 'testing message' + + +def test_bot_multiple_users(bot, user, user2): + + user.send_message('my name user1') + + message = user.get_message() + + assert message + assert message['text'] == 'my name user1' + + user2.send_message('my name user2') + + message = user2.get_message() + + assert message + assert message['text'] == 'my name user2' diff --git a/telegram_bot_unittest/pytest/testbot.py b/telegram_bot_unittest/pytest/testbot.py new file mode 100644 index 0000000..c79c300 --- /dev/null +++ b/telegram_bot_unittest/pytest/testbot.py @@ -0,0 +1,71 @@ +import os +import io + +from telegram import Update, ForceReply +from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext + + +BOT_TOKEN = os.getenv("BOT_TOKEN") + + +def start(update: Update, context: CallbackContext) -> None: + user = update.effective_user + update.message.reply_markdown_v2( + fr'Hi {user.mention_markdown_v2()}\!', + reply_markup=ForceReply(selective=True), + ) + + +def help_command(update: Update, context: CallbackContext) -> None: + update.message.reply_text('Help!') + + +def echo(update: Update, context: CallbackContext) -> None: + update.message.reply_text(update.message.text) + + +def echo_document(update: Update, context: CallbackContext) -> None: + """Echo the user message.""" + chat_id = update.message.chat_id + user_id = update.message.from_user.id + + attachment = io.BytesIO() + context.bot.get_file(update.message.document).download(out=attachment) + attachment.seek(0) + + content = attachment.read().decode('utf-8') + + f = io.BytesIO(bytes(content, "utf-8")) + f.name = 'echo_' + update.message.document.file_name + f.seek(0) + + context.bot.send_document(chat_id, f) + + +def setup_bot(bot_token: str, base_url: str = None) -> Updater: + + updater = Updater(bot_token, base_url=base_url) + + dispatcher = updater.dispatcher + + dispatcher.add_handler(CommandHandler("start", start)) + dispatcher.add_handler(CommandHandler("help", help_command)) + + dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, echo)) + + dispatcher.add_handler(MessageHandler(Filters.attachment, echo_document)) + + updater.start_polling() + + return updater + + +def main(base_url: str = None) -> None: + + updater = setup_bot(BOT_TOKEN, base_url) + + updater.idle() + + +if __name__ == '__main__': + main() diff --git a/telegram_bot_unittest/routes.py b/telegram_bot_unittest/routes.py index 68fe374..3e3cb0c 100644 --- a/telegram_bot_unittest/routes.py +++ b/telegram_bot_unittest/routes.py @@ -2,10 +2,13 @@ import json import threading import waitress -from flask import Flask, request, make_response - +import werkzeug +from io import BytesIO +from typing import List, Dict +from flask import Flask, request, make_response, send_file from flask import json -from .core import result_ok, core +from .core import core +from .storage import storage, FileBase, DocumentBase from .user import virtual_bot logger = logging.getLogger(__name__) @@ -17,8 +20,23 @@ TELEGRAM_URL = f'http://{HOST}:{PORT}/' -def server_response(data): +@app.errorhandler(werkzeug.exceptions.BadRequest) +def handle_bad_request(e): + logger.error('Server error: ' + str(e)) + return 'bad request!', 500 + + +app.register_error_handler(500, handle_bad_request) + +def result_ok(result: List, ok: bool = True) -> Dict: + return { + "ok": ok, + "result": result + } + + +def server_response(data): r = make_response( json.dumps(result_ok(data)), 200 @@ -29,7 +47,12 @@ def server_response(data): @app.route('//getMe', methods=['POST']) def getMe(token: str): - j = request.get_json() # cause flask bug + """ + Bot asks Telegram about himself + :param token: + :return: + """ + request.get_json() # must read body (flask bug) bot_id = int(token.split(':')[0]) @@ -41,26 +64,39 @@ def getMe(token: str): @app.route('//deleteWebhook', methods=['POST']) def deleteWebhook(token: str): - j = request.get_json() # cause flask bug + """ + Bot deletes Webhook + :param token: + :return: + """ + request.get_json() # must read body (flask bug) return server_response(True) @app.route('//getUpdates', methods=['POST']) def getUpdates(token: str): - j = request.get_json() # cause flask bug + """ + Bot checks updates (long polling) + :param token: + :return: + """ + request.get_json() # must read body (flask bug) bot_id = int(token.split(':')[0]) ret = core.get_updates(bot_id) + logger.info(ret) return server_response(ret) @app.route('//sendMessage', methods=['POST']) def sendMessage(token: str): - - #bot_id = int(token.split(':')[0]) - + """ + Bot sends message to Telegram + :param token: + :return: + """ data = request.get_json() chat_id = int(data['chat_id']) @@ -68,26 +104,99 @@ def sendMessage(token: str): chat = {'id': chat_id, 'type': 'private'} # simple chat structure - ret = core.bot_send(bot_from=virtual_bot.to_dict(), - chat=chat, - text=text - ) + ret = core.send(chat_id, + sender=virtual_bot.to_dict(), + chat=chat, + text=text) return server_response(ret) +@app.route('//sendDocument', methods=['POST']) +def sendDocument(token: str): + """ + Bot sends document to Telegram + :param token: + :return: + """ + d = request.files['document'] + f = BytesIO(d.stream.read()) + f.name = d.filename + f.seek(0) + + document = DocumentBase(f, d.filename, d.mimetype) + storage.add(document['file_id'], document) + + chat_id = int(request.form['chat_id']) + chat = {'id': chat_id, 'type': 'private'} # simple chat structure + + ret = core.send(chat_id, + sender=virtual_bot.to_dict(), + chat=chat, + document=document.to_dict()) + + return server_response(ret) + + +@app.route('//getFile', methods=['POST']) +def getFile(token: str): + """ + Bot/User ask Telegram for the file information + :param token: + :param file_id: file id + :return: + """ + data = request.get_json() + file_id = data['file_id'] + + document = storage.get(file_id) + if document: + file = FileBase(document, + file_path=f'file/{file_id}') + + return server_response(file.to_dict()) + + return result_ok([], False) + + +@app.route('//file/', methods=['GET']) +def file(token: str, file_id: str): + """ + Bot/User downloads file from Telegram + :param token: + :param file_id: file id + :return: + """ + document = storage.get(file_id) + return send_file(document.path_or_bytes, as_attachment=True) + + def start_server(): s = waitress.create_server(app, host=HOST, port=PORT) - t = threading.Thread(target=s.run) + + def run(s): + try: + s.run() + except Exception as err: + logger.error(err) + del s + logger.info("server deleted") + + t = threading.Thread(target=run, args=(s,)) t.daemon = True t.start() return s, t def shutdown_server(s, t): + logger.info('start close server') s.close() + logger.info('server closed') + if t.is_alive(): - t.join() + logger.info('start join thread') + t.join(timeout=2.0) + logger.info('thread joined') if __name__ == "__main__": diff --git a/telegram_bot_unittest/storage.py b/telegram_bot_unittest/storage.py new file mode 100644 index 0000000..363c7f9 --- /dev/null +++ b/telegram_bot_unittest/storage.py @@ -0,0 +1,65 @@ +import os +import string +import random +import mimetypes +from typing import Union, BinaryIO, Optional +from telegram import Document, File + + +def gen_id(size, chars=string.ascii_lowercase + string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for x in range(size)) + + +class DocumentBase(Document): + + def __init__(self, dir_or_bytes: Union[str, BinaryIO], file_name: str, mime_type: Optional[str] = None): + + #path_or_bytes + if isinstance(dir_or_bytes, str): + self.path_or_bytes = os.path.join(dir_or_bytes, file_name) + file_size = os.path.getsize(self.path_or_bytes) + else: + self.path_or_bytes = dir_or_bytes + file_size = dir_or_bytes.getbuffer().nbytes + + file_id = gen_id(72) + file_unique_id = gen_id(15) + + if not mime_type: + mime_type = mimetypes.types_map[f".{file_name.split('.')[1]}"] + + super().__init__(file_id=file_id, + file_unique_id=file_unique_id, + file_name=file_name, + mime_type=mime_type, + file_size=file_size + ) + + +class FileBase(File): + + def __init__(self, document: DocumentBase, file_path: str): + + super().__init__(file_id=document.file_id, + file_unique_id=document.file_unique_id, + file_name=document.file_name, + file_size=document.file_size, + file_path=file_path + ) + + +class Storage: + + def __init__(self): + self.storage = {} + + def add(self, file_id: str, document: DocumentBase): + self.storage[file_id] = document + + def get(self, file_id: str) -> DocumentBase: + + if file_id in self.storage: + return self.storage[file_id] + + +storage = Storage() \ No newline at end of file diff --git a/telegram_bot_unittest/user.py b/telegram_bot_unittest/user.py index 9e466f9..96b3133 100644 --- a/telegram_bot_unittest/user.py +++ b/telegram_bot_unittest/user.py @@ -1,5 +1,14 @@ -from typing import Dict -from telegram import User, Chat +import os +import logging +import time +from io import BytesIO +from typing import Dict, List +from telegram import User, Chat, Contact +from .storage import DocumentBase, storage +from .core import TelegramCore + +logger = logging.getLogger(__file__) + BOT_ID = 5000000000 @@ -8,69 +17,176 @@ CHAT_ID = 1 +class ContactBase(Contact): + + def __init__(self, user, phone: str = '79991112233'): + logger.debug(f'create ContactBase={user} with phone={phone}') + super().__init__( + phone, + user.first_name, + user.last_name, + user.id + ) + + class UserBase(User): def __init__(self, id: int = CHAT_ID): + logger.debug(f'create UserBase={id}') super().__init__( id, # id - 'FN', # first_name + 'FirstName', # first_name False, # is_bot - 'LN', # last_name - 'user1', # username + 'LastName', # last_name + f'user{id}', # username 'ru' # language_code ) class ChatBase(Chat): - def __init__(self, id: int = CHAT_ID): + def __init__(self, user: UserBase): + logger.debug(f'create ChatBase={id}') super().__init__( - id, + user.id, 'private', # type None, # title - 'user1', # username - 'FN', # first_name - 'LN', # last_name + user.username, + user.first_name, + user.last_name, + ) + + +class BotBase(User): + + def __init__(self, + id: int = BOT_ID, + first_name='PythonTelegramUnitTestBot', + username='PTUTestBot'): + + logger.debug(f'create BotBase={id}') + + super().__init__( + id, # id + first_name, + True, # is_bot + None, # last_name + username, + None, # language_code + True, # can_join_groups + False, # can_read_all_group_messages + False # supports_inline_queries ) -virtual_bot = User( - BOT_ID, - 'PythonTelegramUnitTestBot', # first_name - True, # is_bot - None, # last_name - 'PTUTestBot', # username - None, # language_code - True, # can_join_groups - False, # can_read_all_group_messages - False # supports_inline_queries -) +virtual_bot = BotBase() + +chat = ChatBase(virtual_bot) + + +class LessMessages(Exception): + pass class Tester: - def __init__(self, core, user, chat): + def __init__(self, core: TelegramCore, user, chat, contact: ContactBase = None): self.core = core self.user = user self.chat = chat + self.contact = contact if contact else ContactBase(user) + + @property + def id(self): + return self.user.id + + def clean_dialog(self): + while self.get_message(timeout=0.5): + pass def send_message(self, text: str) -> None: - self.core.user_send(virtual_bot.id, - user_from=self.user.to_dict(), - chat=self.chat.to_dict(), - text=text - ) + self.core.send(virtual_bot.id, + sender=self.user.to_dict(), + chat=self.chat.to_dict(), + text=text + ) def send_command(self, command: str) -> None: - self.core.user_send_command(virtual_bot.id, - user_from=self.user.to_dict(), - chat=self.chat.to_dict(), - command=command - ) - - def get_message(self, timeout=2.0) -> Dict: - messages = self.core.get_updates(self.user.id, timeout) - if messages: - return messages[0]['message'] + self.core.send_command(virtual_bot.id, + sender=self.user.to_dict(), + chat=self.chat.to_dict(), + command=command + ) + + def send_contact(self) -> None: + self.core.send(virtual_bot.id, + sender=self.user.to_dict(), + chat=self.chat.to_dict(), + contact=self.contact.to_dict()) + + def get_messages(self, count=1, timeout=2.0) -> List[Dict]: + messages = [] + start = time.time() + while len(messages) < count and time.time() - start < timeout: + messages.extend(self.core.get_updates(self.user.id, timeout)) + + if len(messages) != count: + if not messages: + raise LessMessages(f"No message received") + raise LessMessages(f"Received {len(messages)} messages. Wait {count}") + return [m['message'] for m in messages] + + def get_message(self, timeout=5.0) -> Dict: + return self.get_messages(1, timeout)[0] + + def get_message_text(self, timeout=5.0) -> str: + return self.get_messages(1, timeout)[0]['text'] + + def send_document(self, dir: str, file_name: str) -> None: + + document = DocumentBase(dir, file_name) + storage.add(document['file_id'], document) + + self.core.send(virtual_bot.id, + sender=self.user.to_dict(), + chat=self.chat.to_dict(), + document=document.to_dict() + ) + + def get_document(self, timeout=5.0) -> BytesIO: + message = self.get_message(timeout) + assert 'document' in message, 'Message hasn\'t a file' + + file_id = message['document']['file_id'] + return storage.get(file_id) + + def assert_message(self, error_message: str): + try: + self.get_messages(1) + except LessMessages: + return + raise Exception(error_message) + + def assert_get_keyboard(self, text: str, keyboard_text: str, request_contact: bool = True): + """ + {'chat_id': 47390523, + 'text': "sometext", + 'reply_markup': '{"selective": false, + "keyboard": [[{"text": "Share contact", "request_contact": true}]], + "one_time_keyboard": true, + "resize_keyboard": true + }' + } + :return: + """ + message = self.get_message() + assert text in message['text'], 'Incorrect text: ' + message['text'] + assert 'reply_markup' not in message, 'No reply_markup found' + + #TODO понять, что бот шлет, что нет ответа + #assert 'keyboard' not in message['reply_markup'], 'No keyboard found' + #keyboard = message['reply_markup']['keyboard'] + #assert keyboard_text in keyboard['text'], 'Incorrect keyboard text: ' + keyboard['text'] + #assert request_contact == keyboard['request_contact'], 'Incorrect request_contact'