From 0029a8dc1c9adfb6a006ea594f604cb955623cab Mon Sep 17 00:00:00 2001 From: Philipp Kessling <32732590+pekasen@users.noreply.github.com> Date: Wed, 12 Apr 2023 16:04:09 +0200 Subject: [PATCH] feat: add hydrate command (#41) --- README.md | 23 ++++++++++++++ tegracli/dispatch.py | 67 +++++++++++++++++++++------------------ tegracli/main.py | 27 ++++++++++++++++ tests/test_cli.py | 22 +++++++++++++ tests/test_dispatchers.py | 65 +++++++++++++++++++++++++++---------- 5 files changed, 156 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 76dc04e..49a0759 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,29 @@ Options: --help Show this message and exit. ``` +## HYDRATE + +To rehydrate messages from the API this command accepts a file with message IDs in the format of `$channel_name/$post_number`. +Both input and output file are optional, if not given, `stdin` and `stdout` are used. + +Output data is JSONL, one message per line. + +```text +Usage: tegracli hydrate [OPTIONS] [INPUT_FILE] [OUTPUT_FILE] + + Hydrate a file with messages-ids. + +Options: + --help Show this message and exit. +``` + +For example, to rehydrate message IDs: + +```bash +echo test_channel/1234 | tegracli hydrate +>> {"_":"Message","id": 1234, ... , "restriction_reason":[],"ttl_period":null} +``` + ## GROUP INIT and GROUP RUN In order to support updatable and long-running collections `tegracli` sports an *account group* feature which retrieves diff --git a/tegracli/dispatch.py b/tegracli/dispatch.py index d0a2564..f8a6155 100644 --- a/tegracli/dispatch.py +++ b/tegracli/dispatch.py @@ -15,16 +15,19 @@ from .types import MessageHandler from .utilities import str_dict +# pylint: disable=I1101 # c-extensions-no-member; we know it's there and that's why we don't +# want to see it + async def dispatch_iter_messages( client: TelegramClient, params: Dict, callback: MessageHandler ) -> None: - """Dispatch a a TG-method with callback. + """Dispatch a TG-method with callback. - Parameters: - client : TelegramClient - params : Dict - callback : MessageHandler + Args: + client: the client to use. + params: the parameters to pass to the method. + callback: the callback to pass data to. """ async for message in client.iter_messages(wait_time=10, **params): await callback(message) @@ -32,7 +35,6 @@ async def dispatch_iter_messages( async def dispatch_get(users, client: TelegramClient, params: Dict): """Get the message history of a specified set of users.""" - for user in users: done = False while done is False: @@ -61,10 +63,24 @@ async def dispatch_get(users, client: TelegramClient, params: Dict): done = True +async def dispatch_hydrate( + channel: str, + post_ids: List[str], + output_file: TextIOWrapper, + client: TelegramClient, +): + """Dispatch a hydration by channel_id/post_id.""" + await dispatch_iter_messages( + client, + {"entity": channel, "ids": post_ids}, + partial(handle_message, file=output_file, injects=None), + ) + + async def dispatch_search(queries: List[str], client: TelegramClient): """Dispatch a global search.""" local_account = await client.get_me() - log.info(f"Using telegram accout of {local_account.username}") + log.info(f"Using telegram account of {local_account.username}") for query in queries: try: async for message in client.iter_messages(None, search=query, limit=15): @@ -84,18 +100,11 @@ async def handle_message( ): """Accept incoming messages and log them to disk. - Parameters - ---------- - - message : telethon.types.Message : incoming single message - file : TextIOWrapper : opened file to dump the message's json into - - Returns - ------- - - None : nada, nothing + Args: + message: incoming single message. + file: opened file to dump the message's json into. + injects: additional data to inject into the message. """ - # log.debug(f"Received {message.peer_id.channel_id}/{message.id}") m_dict = str_dict(message.to_dict()) if injects is not None: for key, value in injects.items(): @@ -110,15 +119,12 @@ async def get_input_entity( ) -> Optional[telethon.types.TypeInputPeer]: """Wraps the client.get_input_entity function. - Parameters - ---------- + Args: + client: signed in TG client. + member_id: id/handle/URL of the entity to get. - client : TelegramClient : signed in TG client - member_id : int : id/handle/URL of the entity to get - - Returns - ------- - Optional[telethon.types.TypeInputPeer] : returns the requested entity or None + Returns: + Optional[telethon.types.TypeInputPeer] : returns the requested entity or None """ return await client.get_input_entity(member_id) @@ -128,11 +134,10 @@ async def get_profile( ) -> Optional[Dict[str, str]]: """Returns a Dict from the requested entity. - Parameters - ---------- - - client : TelegramClient : signed in TG client - member : str : id/handle/URL of the entity to request + Args: + client: signed in TG client. + member: id/handle/URL of the entity to request. + group_name: name of the group to save the profile to. """ _member = int(member) if str.isnumeric(member) else member profile = await client.get_entity(_member) diff --git a/tegracli/main.py b/tegracli/main.py index 0d876a4..ae8d7d3 100644 --- a/tegracli/main.py +++ b/tegracli/main.py @@ -18,6 +18,7 @@ from .dispatch import ( dispatch_get, + dispatch_hydrate, dispatch_iter_messages, dispatch_search, get_input_entity, @@ -79,6 +80,32 @@ def configure(): client.loop.run_until_complete(ensure_authentication(client, _handle_auth)) +@cli.command() +@click.argument("input_file", type=click.File("r", encoding="utf-8"), default="-") +@click.argument("output_file", type=click.File("w", encoding="utf-8"), default="-") +@click.pass_context +def hydrate(ctx: click.Context, input_file: click.File, output_file: click.File): + """Hydrate a file with messages-ids.""" + client = get_client(ctx.obj["credentials"]) + + channel_registry = {} + for message_id in input_file: + channel, post_id = message_id.split("/") + + post_id = int(post_id) + if post_id is None: + continue + if channel not in channel_registry: + channel_registry[channel] = [post_id] + else: + channel_registry[channel].append(post_id) + with client: + for channel, post_ids in channel_registry.items(): + client.loop.run_until_complete( + dispatch_hydrate(channel, post_ids, output_file, client) + ) + + @cli.command() @click.option( "--limit", "-l", type=int, default=-1, help="Number of messages to retrieve." diff --git a/tests/test_cli.py b/tests/test_cli.py index 131e1fd..c575cfd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -205,4 +205,26 @@ def test_configure(runner: CliRunner, tmp_path: Path): assert Path("tegracli.conf.yml").exists() +def test_hydrate(runner: CliRunner, tmp_path: Path): + """Should hydrate a list of ids.""" + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + conf_file = Path(temp_dir) / "tegracli.conf.yml" + + with conf_file.open("w") as config: + yaml.dump( + { + "api_id": 123456, + "api_hash": "wahgi231kmdma91", + "session_name": "test", + }, + config, + ) + + test_messages = ["QlobalChange/12182", "QlobalChangeEspana/162"] + + result = runner.invoke(cli, ["hydrate"], input="\n".join(test_messages)) + + assert result.exit_code == 0 + + patcher.stop() diff --git a/tests/test_dispatchers.py b/tests/test_dispatchers.py index 3f6452a..f0979e0 100644 --- a/tests/test_dispatchers.py +++ b/tests/test_dispatchers.py @@ -1,4 +1,4 @@ -""" # Dispatcher Functions Test +"""Dispatcher Functions Tests. This tests suit concerns itself with the asynchronous dispatcher functions. @@ -13,21 +13,23 @@ from typing import Dict, List import pytest +import ujson import yaml from telethon import TelegramClient +from tegracli.dispatch import dispatch_hydrate from tegracli.main import dispatch_get, dispatch_search @pytest.fixture def queries(): - """random query strings""" + """Random query strings.""" return ["randomstring"] @pytest.fixture def client(): - """Get a configured client""" + """Get a configured client.""" with Path("tegracli.conf.yml").open("r", encoding="utf8") as conf: conf = yaml.safe_load(conf) @@ -37,19 +39,13 @@ def client(): @pytest.mark.api @pytest.mark.enable_socket def test_search(queries: List[str], client: TelegramClient): - """Should run a search on the specified queries + """Should run a search on the specified queries. - Asserts - ------- - - Should not throw exception + Asserts: + - Should not throw exception """ - - # client = Mock(TelegramClient) - # client.loop = Mock(AbstractEventLoop) with client: client.loop.run_until_complete(dispatch_search(queries, client)) - # client.assert_called() @pytest.mark.api @@ -60,14 +56,49 @@ def test_search(queries: List[str], client: TelegramClient): ) @pytest.mark.parametrize("queries", [["channelnotfound123"], ["channel", "1446651076"]]) def test_get(queries: List[str], client: TelegramClient, params: Dict): - """Should get message for existing channels + """Should get message for existing channels. - Asserts - ------- - - Should not throw exception + Asserts: + - Should not throw exception """ # client = Mock(TelegramClient) # client.loop = Mock(AbstractEventLoop) with client: client.loop.run_until_complete(dispatch_get(queries, client, params)) + + +@pytest.mark.api +@pytest.mark.enable_socket +@pytest.mark.parametrize( + "params,results", [["QlobalChange/12182", 12182], ["QlobalChangeEspana/162", 162]] +) +def test_dispatch_hydrate(params: str, results: int, client: TelegramClient): + """Should get message for existing channels. + + Asserts: + - Should not throw exception + """ + print("params", params, "results", results) + + output_file = Path("test_hydrate.jsonl") + if output_file.exists(): + output_file.unlink() + channel, post_id = params.split("/") + post_id = int(post_id) + + with output_file.open("a", encoding="utf-8") as file: + with client: + client.loop.run_until_complete( + dispatch_hydrate(channel, [post_id], file, client) + ) + + with output_file.open("r", encoding="utf-8") as file: + for line in file: + try: + res = ujson.loads( + line + ) # pylint: disable=I1101 # c-extensions-no-member, what we + # know it's there and that's why we don't want to see it + assert int(res["id"]) == results + except ValueError: + continue