Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add hydrate command #41

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,29 @@ Options:
--help Show this message and exit.
```

## HYDRATE

To rehydrate messages from the API this command accepts a file with message IDs in the format of `$channel_name/$post_number`.
Both input and output file are optional, if not given, `stdin` and `stdout` are used.

Output data is JSONL, one message per line.

```text
Usage: tegracli hydrate [OPTIONS] [INPUT_FILE] [OUTPUT_FILE]

Hydrate a file with messages-ids.

Options:
--help Show this message and exit.
```

For example, to rehydrate message IDs:

```bash
echo test_channel/1234 | tegracli hydrate
>> {"_":"Message","id": 1234, ... , "restriction_reason":[],"ttl_period":null}
```

## GROUP INIT and GROUP RUN

In order to support updatable and long-running collections `tegracli` sports an *account group* feature which retrieves
Expand Down
67 changes: 36 additions & 31 deletions tegracli/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@
from .types import MessageHandler
from .utilities import str_dict

# pylint: disable=I1101 # c-extensions-no-member; we know it's there and that's why we don't
# want to see it


async def dispatch_iter_messages(
client: TelegramClient, params: Dict, callback: MessageHandler
) -> None:
"""Dispatch a a TG-method with callback.
"""Dispatch a TG-method with callback.

Parameters:
client : TelegramClient
params : Dict
callback : MessageHandler
Args:
client: the client to use.
params: the parameters to pass to the method.
callback: the callback to pass data to.
"""
async for message in client.iter_messages(wait_time=10, **params):
await callback(message)


async def dispatch_get(users, client: TelegramClient, params: Dict):
"""Get the message history of a specified set of users."""

for user in users:
done = False
while done is False:
Expand Down Expand Up @@ -61,10 +63,24 @@ async def dispatch_get(users, client: TelegramClient, params: Dict):
done = True


async def dispatch_hydrate(
channel: str,
post_ids: List[str],
output_file: TextIOWrapper,
client: TelegramClient,
):
"""Dispatch a hydration by channel_id/post_id."""
await dispatch_iter_messages(
client,
{"entity": channel, "ids": post_ids},
partial(handle_message, file=output_file, injects=None),
)


async def dispatch_search(queries: List[str], client: TelegramClient):
"""Dispatch a global search."""
local_account = await client.get_me()
log.info(f"Using telegram accout of {local_account.username}")
log.info(f"Using telegram account of {local_account.username}")
for query in queries:
try:
async for message in client.iter_messages(None, search=query, limit=15):
Expand All @@ -84,18 +100,11 @@ async def handle_message(
):
"""Accept incoming messages and log them to disk.

Parameters
----------

message : telethon.types.Message : incoming single message
file : TextIOWrapper : opened file to dump the message's json into

Returns
-------

None : nada, nothing
Args:
message: incoming single message.
file: opened file to dump the message's json into.
injects: additional data to inject into the message.
"""
# log.debug(f"Received {message.peer_id.channel_id}/{message.id}")
m_dict = str_dict(message.to_dict())
if injects is not None:
for key, value in injects.items():
Expand All @@ -110,15 +119,12 @@ async def get_input_entity(
) -> Optional[telethon.types.TypeInputPeer]:
"""Wraps the client.get_input_entity function.

Parameters
----------
Args:
client: signed in TG client.
member_id: id/handle/URL of the entity to get.

client : TelegramClient : signed in TG client
member_id : int : id/handle/URL of the entity to get

Returns
-------
Optional[telethon.types.TypeInputPeer] : returns the requested entity or None
Returns:
Optional[telethon.types.TypeInputPeer] : returns the requested entity or None
"""
return await client.get_input_entity(member_id)

Expand All @@ -128,11 +134,10 @@ async def get_profile(
) -> Optional[Dict[str, str]]:
"""Returns a Dict from the requested entity.

Parameters
----------

client : TelegramClient : signed in TG client
member : str : id/handle/URL of the entity to request
Args:
client: signed in TG client.
member: id/handle/URL of the entity to request.
group_name: name of the group to save the profile to.
"""
_member = int(member) if str.isnumeric(member) else member
profile = await client.get_entity(_member)
Expand Down
27 changes: 27 additions & 0 deletions tegracli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from .dispatch import (
dispatch_get,
dispatch_hydrate,
dispatch_iter_messages,
dispatch_search,
get_input_entity,
Expand Down Expand Up @@ -79,6 +80,32 @@ def configure():
client.loop.run_until_complete(ensure_authentication(client, _handle_auth))


@cli.command()
@click.argument("input_file", type=click.File("r", encoding="utf-8"), default="-")
@click.argument("output_file", type=click.File("w", encoding="utf-8"), default="-")
@click.pass_context
def hydrate(ctx: click.Context, input_file: click.File, output_file: click.File):
"""Hydrate a file with messages-ids."""
client = get_client(ctx.obj["credentials"])

channel_registry = {}
for message_id in input_file:
channel, post_id = message_id.split("/")

post_id = int(post_id)
if post_id is None:
continue
if channel not in channel_registry:
channel_registry[channel] = [post_id]
else:
channel_registry[channel].append(post_id)
with client:
for channel, post_ids in channel_registry.items():
client.loop.run_until_complete(
dispatch_hydrate(channel, post_ids, output_file, client)
)


@cli.command()
@click.option(
"--limit", "-l", type=int, default=-1, help="Number of messages to retrieve."
Expand Down
22 changes: 22 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,26 @@ def test_configure(runner: CliRunner, tmp_path: Path):
assert Path("tegracli.conf.yml").exists()


def test_hydrate(runner: CliRunner, tmp_path: Path):
"""Should hydrate a list of ids."""
with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir:
conf_file = Path(temp_dir) / "tegracli.conf.yml"

with conf_file.open("w") as config:
yaml.dump(
{
"api_id": 123456,
"api_hash": "wahgi231kmdma91",
"session_name": "test",
},
config,
)

test_messages = ["QlobalChange/12182", "QlobalChangeEspana/162"]

result = runner.invoke(cli, ["hydrate"], input="\n".join(test_messages))

assert result.exit_code == 0


patcher.stop()
65 changes: 48 additions & 17 deletions tests/test_dispatchers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" # Dispatcher Functions Test
"""Dispatcher Functions Tests.

This tests suit concerns itself with the asynchronous dispatcher functions.

Expand All @@ -13,21 +13,23 @@
from typing import Dict, List

import pytest
import ujson
import yaml
from telethon import TelegramClient

from tegracli.dispatch import dispatch_hydrate
from tegracli.main import dispatch_get, dispatch_search


@pytest.fixture
def queries():
"""random query strings"""
"""Random query strings."""
return ["randomstring"]


@pytest.fixture
def client():
"""Get a configured client"""
"""Get a configured client."""
with Path("tegracli.conf.yml").open("r", encoding="utf8") as conf:
conf = yaml.safe_load(conf)

Expand All @@ -37,19 +39,13 @@ def client():
@pytest.mark.api
@pytest.mark.enable_socket
def test_search(queries: List[str], client: TelegramClient):
"""Should run a search on the specified queries
"""Should run a search on the specified queries.

Asserts
-------

Should not throw exception
Asserts:
- Should not throw exception
"""

# client = Mock(TelegramClient)
# client.loop = Mock(AbstractEventLoop)
with client:
client.loop.run_until_complete(dispatch_search(queries, client))
# client.assert_called()


@pytest.mark.api
Expand All @@ -60,14 +56,49 @@ def test_search(queries: List[str], client: TelegramClient):
)
@pytest.mark.parametrize("queries", [["channelnotfound123"], ["channel", "1446651076"]])
def test_get(queries: List[str], client: TelegramClient, params: Dict):
"""Should get message for existing channels
"""Should get message for existing channels.

Asserts
-------

Should not throw exception
Asserts:
- Should not throw exception
"""
# client = Mock(TelegramClient)
# client.loop = Mock(AbstractEventLoop)
with client:
client.loop.run_until_complete(dispatch_get(queries, client, params))


@pytest.mark.api
@pytest.mark.enable_socket
@pytest.mark.parametrize(
"params,results", [["QlobalChange/12182", 12182], ["QlobalChangeEspana/162", 162]]
)
def test_dispatch_hydrate(params: str, results: int, client: TelegramClient):
"""Should get message for existing channels.

Asserts:
- Should not throw exception
"""
print("params", params, "results", results)

output_file = Path("test_hydrate.jsonl")
if output_file.exists():
output_file.unlink()
channel, post_id = params.split("/")
post_id = int(post_id)

with output_file.open("a", encoding="utf-8") as file:
with client:
client.loop.run_until_complete(
dispatch_hydrate(channel, [post_id], file, client)
)

with output_file.open("r", encoding="utf-8") as file:
for line in file:
try:
res = ujson.loads(
line
) # pylint: disable=I1101 # c-extensions-no-member, what we
# know it's there and that's why we don't want to see it
assert int(res["id"]) == results
except ValueError:
continue