Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LangchainProcessor can't use custom invocation parameters as config #335

Open
agilebean opened this issue Aug 1, 2024 · 2 comments
Open
Assignees

Comments

@agilebean
Copy link

agilebean commented Aug 1, 2024

This is a strategically important issue for me to use pipecat for good.

pipecat:LangchainProcessor wraps langchain:RunnableWithHistory. I need to replace session_id with two fields user_id and conversation_id as shown in the Langchain documentation.

My code

        chain_runnable = RunnableWithMessageHistory(
            prompted_chat_model,
            # lambda session_id: get_session_history(database_label, session_id),
            lambda session_id: get_session_history(database_label, user_id, conversation_id),
            history_messages_key="chat_history",
            input_messages_key="input",
            **memory_kwargs,
            history_factory_config=[
                ConfigurableFieldSpec(
                    id="user_id",
                    annotation=str,
                    name="User ID",
                    description="Unique identifier for the user.",
                    default="",
                    is_shared=True,
                ),
                ConfigurableFieldSpec(
                    id="conversation_id",
                    annotation=str,
                    name="Conversation ID",
                    description="Unique identifier for the conversation.",
                    default="",
                    is_shared=True,
                ),
            ],
        )

Error

The above code throws

ValueError: Missing keys ['conversation_id', 'user_id'] in config['configurable'] Expected keys are ['conversation_id', 'user_id'].When using via .invoke() or .stream(), pass in a config; e.g., chain.invoke({'input': 'foo'}, {'configurable': {'conversation_id': '[your-value-here]', 'user_id': '[your-value-here]'}})

Tried changing the pipecat code to replace session_id in langchain.py as:

    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        await self.push_frame(LLMFullResponseStartFrame())
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                # config={"configurable": {"session_id": self._participant_id}},
                config={"configurable": {
                    'user_id': self._user_id,
                    'conversation_id': self._conversation_id
                }},
            ):
                await self.push_frame(TextFrame(self.__get_token_value(t

but this throws

ValueError: Expected keys ['conversation_id', 'user_id'] do not match parameter names ['session_id'] of get_session_history. 

I could extend that to accept the custom parameters but i was looking for a more general solution which established pipecat contributors like @TomTom101 could think of.

Langchain code

As I understand the langchain code, this is how the custom config can be passed:
langchain_core/runnables/base.py, line 5242, in astream():

self._merge_configs(config)

which calls langchain_core/runnables/history.py, lines 533ff:

expected_keys = [field_spec.id for field_spec in self.history_factory_config]
...
            message_history = self.get_session_history(
                **{key: configurable[key] for key in expected_keys}
            )

Pipecat code

So in pipecat/processors/frame_processor.py, the _ainvoke method uses only the session_id

    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        await self.push_frame(LLMFullResponseStartFrame())
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                config={"configurable": {"session_id": self._participant_id}},
            ):

Suggested Solution

So _ainvoke could be extended to pass the the keys from langchain's self.history_factory_config?
Maybe something along this snippet:

    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        await self.push_frame(LLMFullResponseStartFrame())

        if hasattr(self, '_configurable'):
            configurable = self._configurable
        else:
            configurable = {"configurable": {"session_id": self._participant_id}}
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                config=configurable
            ):
                await self.push_frame(TextFrame(self.__get_token_value(token)))

OR
is there another way to pass the config dict for the new parameters to LangchainProcessor?

@agilebean agilebean changed the title LangchainProcessor can't use LangchainProcessor can't use custom invocation parameters as config Aug 1, 2024
@TomTom101
Copy link
Contributor

Thanks! Try this branch and set your configurables like this

config = {"user_id": "123", "conversation_id": "1"}
lc.set_configurable(config)

session_id is an configurable on its own and should not be hard-coded anyway.

@agilebean
Copy link
Author

@TomTom101 I tried this but didn't work.
Doesn't the invoke method need to access it like this?

    async def _ainvoke(self, text: dict):
...
        if hasattr(self, '_configurable'):
            configurable = self._configurable
        else:
            configurable = {"configurable": {"session_id": self._participant_id}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants