-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_handler.py
130 lines (105 loc) · 5.41 KB
/
message_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from typing import List, Dict, Set
from telethon import TelegramClient, types
from telethon.tl.types import Message
from loguru import logger
class MessageHandler:
def __init__(self, client: TelegramClient):
self.client = client
self._processed_messages = set() # Cache of processed message IDs
self._processed_albums = set() # Cache of processed album IDs
async def process_message(self, message: Message, target_channel_id: int) -> None:
"""Process single message or part of album"""
try:
if message.id in self._processed_messages:
logger.debug(f"Skipping already processed message {message.id}")
return
# Handle grouped messages (albums)
if message.grouped_id:
group_id = str(message.grouped_id)
if group_id in self._processed_albums:
logger.debug(f"Skipping already processed album {group_id}")
return
album_messages = await message.client.get_messages(
message.chat_id,
ids=[message.id - i for i in range(10)]
)
is_first_in_album = all(
msg is None or msg.grouped_id != message.grouped_id
for msg in album_messages
if msg and msg.id < message.id
)
if not is_first_in_album:
logger.debug(f"Skipping non-first message {message.id} in album {group_id}")
return
logger.debug(f"Processing first message {message.id} of album {group_id}")
await self._handle_album_message(message, target_channel_id)
else:
await self._forward_single_message(message, target_channel_id)
except Exception as e:
logger.error(f"Error in process_message: {str(e)}")
async def _handle_album_message(self, message: Message, target_channel_id: int) -> None:
"""Handle message that is part of an album"""
if not message.grouped_id:
return
group_id = str(message.grouped_id)
logger.debug(f"Starting to process album {group_id} (message {message.id})")
try:
# Get messages by grouped_id
album_messages = []
async for msg in message.client.iter_messages(
message.chat_id,
limit=50,
wait_time=0
):
if msg.grouped_id != message.grouped_id:
continue
if isinstance(msg.media, (types.MessageMediaPhoto, types.MessageMediaDocument)):
album_messages.append(msg)
logger.debug(f"Found album message {msg.id}")
if len(album_messages) >= 10 or msg.id < message.id - 20:
break
if not album_messages:
logger.warning(f"No media messages found in album {group_id}")
return
# Sort messages by ID to maintain order
album = sorted(album_messages, key=lambda x: x.id)
logger.info(f"Prepared album {group_id} with {len(album)} messages for forwarding")
# Forward the album
await self._forward_album(album, target_channel_id)
# Mark all messages as processed
self._processed_albums.add(group_id)
for msg in album:
self._processed_messages.add(msg.id)
logger.debug(f"Marked message {msg.id} as processed")
logger.info(f"Completed processing album {group_id}")
except Exception as e:
logger.error(f"Error handling album {group_id}: {str(e)}")
async def _forward_single_message(self, message: Message, target_channel_id: int) -> None:
"""Forward single message to target channel"""
try:
await self.client.forward_messages(target_channel_id, message)
await message.mark_read()
self._processed_messages.add(message.id)
logger.info(f"Forwarded message {message.id} to target channel")
except Exception as e:
logger.error(f"Error forwarding message {message.id}: {str(e)}")
async def _forward_album(self, album: List[Message], target_channel_id: int) -> None:
"""Forward album to target channel"""
try:
# Forward messages as a group
await self.client.forward_messages(
target_channel_id,
messages=album,
from_peer=album[0].chat_id
)
# Mark as read after successful forward
for message in album:
await message.mark_read()
logger.info(f"Successfully forwarded album with {len(album)} messages to target channel")
except Exception as e:
logger.error(f"Error forwarding album: {str(e)}")
logger.error(f"Album details: {len(album)} messages, first message ID: {album[0].id if album else 'unknown'}")
def clear_cache(self) -> None:
"""Clear the cache of processed messages"""
self._processed_messages.clear()
self._processed_albums.clear()