Skip to content

Commit

Permalink
feat: add hydrate command (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
pekasen authored Apr 12, 2023
1 parent 61094eb commit 0029a8d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 48 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,29 @@ Options:
--help Show this message and exit.
```

## HYDRATE

To rehydrate messages from the API this command accepts a file with message IDs in the format of `$channel_name/$post_number`.
Both input and output file are optional, if not given, `stdin` and `stdout` are used.

Output data is JSONL, one message per line.

```text
Usage: tegracli hydrate [OPTIONS] [INPUT_FILE] [OUTPUT_FILE]
Hydrate a file with messages-ids.
Options:
--help Show this message and exit.
```

For example, to rehydrate message IDs:

```bash
echo test_channel/1234 | tegracli hydrate
>> {"_":"Message","id": 1234, ... , "restriction_reason":[],"ttl_period":null}
```

## GROUP INIT and GROUP RUN

In order to support updatable and long-running collections `tegracli` sports an *account group* feature which retrieves
Expand Down
67 changes: 36 additions & 31 deletions tegracli/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@
from .types import MessageHandler
from .utilities import str_dict

# pylint: disable=I1101 # c-extensions-no-member; we know it's there and that's why we don't
# want to see it


async def dispatch_iter_messages(
client: TelegramClient, params: Dict, callback: MessageHandler
) -> None:
"""Dispatch a a TG-method with callback.
"""Dispatch a TG-method with callback.
Parameters:
client : TelegramClient
params : Dict
callback : MessageHandler
Args:
client: the client to use.
params: the parameters to pass to the method.
callback: the callback to pass data to.
"""
async for message in client.iter_messages(wait_time=10, **params):
await callback(message)


async def dispatch_get(users, client: TelegramClient, params: Dict):
"""Get the message history of a specified set of users."""

for user in users:
done = False
while done is False:
Expand Down Expand Up @@ -61,10 +63,24 @@ async def dispatch_get(users, client: TelegramClient, params: Dict):
done = True


async def dispatch_hydrate(
channel: str,
post_ids: List[str],
output_file: TextIOWrapper,
client: TelegramClient,
):
"""Dispatch a hydration by channel_id/post_id."""
await dispatch_iter_messages(
client,
{"entity": channel, "ids": post_ids},
partial(handle_message, file=output_file, injects=None),
)


async def dispatch_search(queries: List[str], client: TelegramClient):
"""Dispatch a global search."""
local_account = await client.get_me()
log.info(f"Using telegram accout of {local_account.username}")
log.info(f"Using telegram account of {local_account.username}")
for query in queries:
try:
async for message in client.iter_messages(None, search=query, limit=15):
Expand All @@ -84,18 +100,11 @@ async def handle_message(
):
"""Accept incoming messages and log them to disk.
Parameters
----------
message : telethon.types.Message : incoming single message
file : TextIOWrapper : opened file to dump the message's json into
Returns
-------
None : nada, nothing
Args:
message: incoming single message.
file: opened file to dump the message's json into.
injects: additional data to inject into the message.
"""
# log.debug(f"Received {message.peer_id.channel_id}/{message.id}")
m_dict = str_dict(message.to_dict())
if injects is not None:
for key, value in injects.items():
Expand All @@ -110,15 +119,12 @@ async def get_input_entity(
) -> Optional[telethon.types.TypeInputPeer]:
"""Wraps the client.get_input_entity function.
Parameters
----------
Args:
client: signed in TG client.
member_id: id/handle/URL of the entity to get.
client : TelegramClient : signed in TG client
member_id : int : id/handle/URL of the entity to get
Returns
-------
Optional[telethon.types.TypeInputPeer] : returns the requested entity or None
Returns:
Optional[telethon.types.TypeInputPeer] : returns the requested entity or None
"""
return await client.get_input_entity(member_id)

Expand All @@ -128,11 +134,10 @@ async def get_profile(
) -> Optional[Dict[str, str]]:
"""Returns a Dict from the requested entity.
Parameters
----------
client : TelegramClient : signed in TG client
member : str : id/handle/URL of the entity to request
Args:
client: signed in TG client.
member: id/handle/URL of the entity to request.
group_name: name of the group to save the profile to.
"""
_member = int(member) if str.isnumeric(member) else member
profile = await client.get_entity(_member)
Expand Down
27 changes: 27 additions & 0 deletions tegracli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from .dispatch import (
dispatch_get,
dispatch_hydrate,
dispatch_iter_messages,
dispatch_search,
get_input_entity,
Expand Down Expand Up @@ -79,6 +80,32 @@ def configure():
client.loop.run_until_complete(ensure_authentication(client, _handle_auth))


@cli.command()
@click.argument("input_file", type=click.File("r", encoding="utf-8"), default="-")
@click.argument("output_file", type=click.File("w", encoding="utf-8"), default="-")
@click.pass_context
def hydrate(ctx: click.Context, input_file: click.File, output_file: click.File):
"""Hydrate a file with messages-ids."""
client = get_client(ctx.obj["credentials"])

channel_registry = {}
for message_id in input_file:
channel, post_id = message_id.split("/")

post_id = int(post_id)
if post_id is None:
continue
if channel not in channel_registry:
channel_registry[channel] = [post_id]
else:
channel_registry[channel].append(post_id)
with client:
for channel, post_ids in channel_registry.items():
client.loop.run_until_complete(
dispatch_hydrate(channel, post_ids, output_file, client)
)


@cli.command()
@click.option(
"--limit", "-l", type=int, default=-1, help="Number of messages to retrieve."
Expand Down
22 changes: 22 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,26 @@ def test_configure(runner: CliRunner, tmp_path: Path):
assert Path("tegracli.conf.yml").exists()


def test_hydrate(runner: CliRunner, tmp_path: Path):
"""Should hydrate a list of ids."""
with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir:
conf_file = Path(temp_dir) / "tegracli.conf.yml"

with conf_file.open("w") as config:
yaml.dump(
{
"api_id": 123456,
"api_hash": "wahgi231kmdma91",
"session_name": "test",
},
config,
)

test_messages = ["QlobalChange/12182", "QlobalChangeEspana/162"]

result = runner.invoke(cli, ["hydrate"], input="\n".join(test_messages))

assert result.exit_code == 0


patcher.stop()
65 changes: 48 additions & 17 deletions tests/test_dispatchers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" # Dispatcher Functions Test
"""Dispatcher Functions Tests.
This tests suit concerns itself with the asynchronous dispatcher functions.
Expand All @@ -13,21 +13,23 @@
from typing import Dict, List

import pytest
import ujson
import yaml
from telethon import TelegramClient

from tegracli.dispatch import dispatch_hydrate
from tegracli.main import dispatch_get, dispatch_search


@pytest.fixture
def queries():
"""random query strings"""
"""Random query strings."""
return ["randomstring"]


@pytest.fixture
def client():
"""Get a configured client"""
"""Get a configured client."""
with Path("tegracli.conf.yml").open("r", encoding="utf8") as conf:
conf = yaml.safe_load(conf)

Expand All @@ -37,19 +39,13 @@ def client():
@pytest.mark.api
@pytest.mark.enable_socket
def test_search(queries: List[str], client: TelegramClient):
"""Should run a search on the specified queries
"""Should run a search on the specified queries.
Asserts
-------
Should not throw exception
Asserts:
- Should not throw exception
"""

# client = Mock(TelegramClient)
# client.loop = Mock(AbstractEventLoop)
with client:
client.loop.run_until_complete(dispatch_search(queries, client))
# client.assert_called()


@pytest.mark.api
Expand All @@ -60,14 +56,49 @@ def test_search(queries: List[str], client: TelegramClient):
)
@pytest.mark.parametrize("queries", [["channelnotfound123"], ["channel", "1446651076"]])
def test_get(queries: List[str], client: TelegramClient, params: Dict):
"""Should get message for existing channels
"""Should get message for existing channels.
Asserts
-------
Should not throw exception
Asserts:
- Should not throw exception
"""
# client = Mock(TelegramClient)
# client.loop = Mock(AbstractEventLoop)
with client:
client.loop.run_until_complete(dispatch_get(queries, client, params))


@pytest.mark.api
@pytest.mark.enable_socket
@pytest.mark.parametrize(
"params,results", [["QlobalChange/12182", 12182], ["QlobalChangeEspana/162", 162]]
)
def test_dispatch_hydrate(params: str, results: int, client: TelegramClient):
"""Should get message for existing channels.
Asserts:
- Should not throw exception
"""
print("params", params, "results", results)

output_file = Path("test_hydrate.jsonl")
if output_file.exists():
output_file.unlink()
channel, post_id = params.split("/")
post_id = int(post_id)

with output_file.open("a", encoding="utf-8") as file:
with client:
client.loop.run_until_complete(
dispatch_hydrate(channel, [post_id], file, client)
)

with output_file.open("r", encoding="utf-8") as file:
for line in file:
try:
res = ujson.loads(
line
) # pylint: disable=I1101 # c-extensions-no-member, what we
# know it's there and that's why we don't want to see it
assert int(res["id"]) == results
except ValueError:
continue

0 comments on commit 0029a8d

Please sign in to comment.