Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: catch errors in dispatch_iter_messages #42

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions tegracli/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Dispatch functions that request data from Telethon and MTProto."""
import datetime
import sys
import time
from functools import partial
from io import TextIOWrapper
Expand All @@ -10,7 +11,7 @@
import ujson
from loguru import logger as log
from telethon import TelegramClient
from telethon.errors import FloodWaitError
from telethon.errors import ChannelPrivateError, FloodWaitError, UserDeactivatedError

from .types import MessageHandler
from .utilities import str_dict
Expand All @@ -29,8 +30,14 @@ async def dispatch_iter_messages(
params: the parameters to pass to the method.
callback: the callback to pass data to.
"""
async for message in client.iter_messages(wait_time=10, **params):
await callback(message)
try:
async for message in client.iter_messages(wait_time=10, **params):
await callback(message)
except UserDeactivatedError:
log.error("User account has been deactivated by Telegram. Stopping now.")
sys.exit(127)
except ChannelPrivateError:
log.error(f"Entity {params['entity']} is private. Skipping.")


async def dispatch_get(users, client: TelegramClient, params: Dict):
Expand Down
26 changes: 12 additions & 14 deletions tegracli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ def hydrate(ctx: click.Context, input_file: click.File, output_file: click.File)
else:
channel_registry[channel].append(post_id)
with client:
for channel, post_ids in channel_registry.items():
client.loop.run_until_complete(
dispatch_hydrate(channel, post_ids, output_file, client)
)
with click.progressbar(channel_registry.items()) as channel_iter:
for channel, post_ids in channel_iter:
client.loop.run_until_complete(
dispatch_hydrate(channel, post_ids, output_file, client)
)


@cli.command()
Expand Down Expand Up @@ -335,17 +336,14 @@ def _handle_group_member(member: str, conf: Group, client: TelegramClient) -> No
log.debug(f"Request with the following parameters: {_params}")

# request data from telethon and write to disk
try:
with (Path(conf.name) / (member + ".jsonl")).open("a") as member_file:
client.loop.run_until_complete(
dispatch_iter_messages(
client,
params=_params,
callback=partial(handle_message, file=member_file, injects=None),
)
with (Path(conf.name) / (member + ".jsonl")).open("a") as member_file:
client.loop.run_until_complete(
dispatch_iter_messages(
client,
params=_params,
callback=partial(handle_message, file=member_file, injects=None),
)
except telethon.errors.ChannelPrivateError:
log.error(f"channel {profile.get('username') or ''} is private. Skipping.")
)


def run_group(client: TelegramClient, groups: Tuple[str]):
Expand Down