Skip to content

Commit

Permalink
feat: add user profile query support for IM adapters
Browse files Browse the repository at this point in the history
- Introduced UserProfile data model with comprehensive user information
- Created UserProfileAdapter protocol for querying user profiles
- Implemented QueryUserProfileBlock for workflow-based user profile retrieval
- Added user profile query method to Telegram adapter with caching
- Supported flexible profile information extraction with fallback mechanisms
  • Loading branch information
lss233 committed Feb 4, 2025
1 parent 269bb91 commit dc80484
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 7 deletions.
13 changes: 13 additions & 0 deletions framework/im/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from framework.im.message import IMMessage
from framework.im.sender import ChatSender
from framework.llm.llm_manager import LLMManager
from .profile import UserProfile

@runtime_checkable
class EditStateAdapter(Protocol):
Expand All @@ -18,6 +19,18 @@ async def set_chat_editing_state(self, chat_sender: ChatSender, is_editing: bool
"""
pass

@runtime_checkable
class UserProfileAdapter(Protocol):
"""
用户资料查询适配器接口,定义了如何获取用户资料
"""
async def query_user_profile(self, chat_sender: ChatSender) -> UserProfile:
"""
查询用户资料
:param chat_sender: 用户的聊天发送者信息
:return: 用户资料
"""
pass

class IMAdapter(ABC):
"""
Expand Down
24 changes: 24 additions & 0 deletions framework/im/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Optional, Any
from enum import Enum, auto
from pydantic import BaseModel, Field

class Gender(Enum):
MALE = auto()
FEMALE = auto()
UNKNOWN = auto()
OTHER = auto()

class UserProfile(BaseModel):
"""
通用的用户资料结构
"""
user_id: str = Field(..., description="用户唯一标识")
username: Optional[str] = Field(None, description="用户名")
display_name: Optional[str] = Field(None, description="显示名称")
full_name: Optional[str] = Field(None, description="完整名称")
gender: Optional[Gender] = Field(None, description="性别")
age: Optional[int] = Field(None, description="年龄")
avatar_url: Optional[str] = Field(None, description="头像URL")
level: Optional[int] = Field(None, description="用户等级")
language: Optional[str] = Field(None, description="语言")
extra_info: Optional[dict] = Field(None, description="平台特定的额外信息")
33 changes: 33 additions & 0 deletions framework/workflow/implementations/blocks/im/user_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Dict, Any, Optional
from framework.ioc.container import DependencyContainer
from framework.workflow.core.block import Block
from framework.workflow.core.workflow.input_output import Input, Output
from framework.im.sender import ChatSender
from framework.im.adapter import IMAdapter, UserProfileAdapter
from framework.im.profile import UserProfile

class QueryUserProfileBlock(Block):
def __init__(self, container: DependencyContainer):
inputs = {
"chat_sender": Input("chat_sender", ChatSender, "Chat sender to query profile for"),
"im_adapter": Input("im_adapter", IMAdapter, "IM Adapter to use for profile query", optional=True)
}
outputs = {
"profile": Output("profile", UserProfile, "User profile information")
}
super().__init__("query_user_profile", inputs, outputs)
self.container = container

def execute(self, chat_sender: ChatSender, im_adapter: Optional[IMAdapter] = None) -> Dict[str, Any]:
# 如果没有提供 im_adapter,则从容器中获取默认的
if im_adapter is None:
im_adapter = self.container.resolve(IMAdapter)

# 检查 im_adapter 是否实现了 UserProfileAdapter 协议
if not isinstance(im_adapter, UserProfileAdapter):
raise TypeError(f"IM Adapter {type(im_adapter)} does not support user profile querying")

# 同步调用异步方法(在工作流执行器中会被正确处理)
profile = im_adapter.query_user_profile(chat_sender)

return {"profile": profile}
31 changes: 27 additions & 4 deletions framework/workflow/implementations/blocks/llm/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,36 @@ def __init__(self, container: DependencyContainer, system_prompt_format: str, us

def substitute_variables(self, text: str, executor: WorkflowExecutor) -> str:
"""
替换文本中的变量占位符
替换文本中的变量占位符,支持对象属性和字典键的访问
:param text: 包含变量占位符的文本,格式为 {variable_name}
:param text: 包含变量占位符的文本,格式为 {variable_name} 或 {variable_name.attribute}
:param executor: 工作流执行器实例
:return: 替换后的文本
"""
def replace_var(match):
var_name = match.group(1)
return str(executor.get_variable(var_name, match.group(0)))
var_path = match.group(1).split('.')
var_name = var_path[0]

# 获取基础变量
value = executor.get_variable(var_name, match.group(0))

# 如果有属性/键访问
for attr in var_path[1:]:
try:
# 尝试字典键访问
if isinstance(value, dict):
value = value.get(attr, match.group(0))
# 尝试对象属性访问
elif hasattr(value, attr):
value = getattr(value, attr)
else:
# 如果无法访问,返回原始占位符
return match.group(0)
except Exception:
# 任何异常都返回原始占位符
return match.group(0)

return str(value)

return re.sub(r'\{([^}]+)\}', replace_var, text)

Expand All @@ -43,9 +64,11 @@ def execute(self, user_msg: IMMessage, memory_content: str) -> Dict[str, Any]:

# 先替换自有的两个变量
system_prompt_format = self.system_prompt_format.replace("{user_msg}", user_msg.content)
system_prompt_format = system_prompt_format.replace("{user_name}", user_msg.sender.display_name)
system_prompt_format = system_prompt_format.replace("{memory_content}", memory_content)

user_prompt_format = self.user_prompt_format.replace("{user_msg}", user_msg.content)
user_prompt_format = user_prompt_format.replace("{user_name}", user_msg.sender.display_name)
user_prompt_format = user_prompt_format.replace("{memory_content}", memory_content)

# 再替换其他变量
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def create_default_workflow(container: DependencyContainer) -> Workflow:
接下来,请你扮演以上的角色,与用户继续交流。
""".strip()

user_prompt = """{user_msg}"""
user_prompt = """{user_name}说:{user_msg}"""

return (WorkflowBuilder("default_workflow", container)
.use(GetIMMessage, name="get_message")
Expand Down
60 changes: 58 additions & 2 deletions plugins/im_telegram_adapter/adapter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import asyncio
import random
from typing import Any
from functools import lru_cache
from telegram import Update, User
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters
from framework.im.adapter import IMAdapter
from framework.im.adapter import EditStateAdapter, IMAdapter, UserProfileAdapter
from framework.im.message import IMMessage, TextMessage, VoiceMessage, ImageMessage
from framework.im.sender import ChatSender, ChatType
from framework.logger import get_logger
from framework.workflow.core.dispatch import WorkflowDispatcher
from pydantic import ConfigDict, BaseModel, Field
import telegramify_markdown
from framework.im.profile import UserProfile, Gender

def get_display_name(user: User):
if user.username:
Expand All @@ -30,7 +32,7 @@ class TelegramConfig(BaseModel):
def __repr__(self):
return f"TelegramConfig(token={self.token})"

class TelegramAdapter(IMAdapter):
class TelegramAdapter(IMAdapter, UserProfileAdapter, EditStateAdapter):
"""
Telegram Adapter,包含 Telegram Bot 的所有逻辑。
"""
Expand Down Expand Up @@ -167,3 +169,57 @@ async def set_chat_editing_state(self, chat_sender: ChatSender, is_editing: bool
except Exception as e:
self.logger.warning(f"Failed to set chat editing state: {str(e)}")

@lru_cache(maxsize=10)
async def _cached_get_chat(self, user_id):
"""
带缓存的获取用户信息方法
:param user_id: 用户ID
:return: 用户对象
"""
return await self.application.bot.get_chat(user_id)

async def query_user_profile(self, chat_sender: ChatSender) -> UserProfile:
"""
查询 Telegram 用户资料
:param chat_sender: 用户的聊天发送者信息
:return: 用户资料
"""
try:
# 获取用户 ID
user_id = chat_sender.user_id
# 获取用户对象(使用缓存)
user = await self._cached_get_chat(user_id)

# 确定性别
gender = Gender.UNKNOWN
if hasattr(user, 'gender'):
if user.gender == 'male':
gender = Gender.MALE
elif user.gender == 'female':
gender = Gender.FEMALE

# 构建用户资料
profile = UserProfile(
user_id=str(user_id),
username=user.username,
display_name=get_display_name(user),
full_name=f"{user.first_name or ''} {user.last_name or ''}".strip(),
gender=gender,
avatar_url=None, # Telegram 需要额外处理获取头像
language=user.language_code,
extra_info={
'is_bot': user.is_bot,
'is_premium': getattr(user, 'is_premium', False)
}
)

return profile

except Exception as e:
self.logger.warning(f"Failed to query user profile: {str(e)}")
# 返回部分信息
return UserProfile(
user_id=str(chat_sender.user_id),
display_name=chat_sender.display_name
)

0 comments on commit dc80484

Please sign in to comment.