From 6d95e0956a93c6141b97c4cdcddaa709b49648b1 Mon Sep 17 00:00:00 2001 From: Vishal Ankush Patil <99629414+patilvishal0597@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:35:36 -0800 Subject: [PATCH] [Fix] streaming chatbedrock converseapi (#351) Added run_manager in stream method of ChatBedrockConverse. Added streaming flag handling for to enable converse streaming with ChatBedrock Fixes #334 ### Using `Chatbedrock` with `streaming` flag set `True`: ```python import boto3 from langchain_aws import ChatBedrock, ChatBedrockConverse from langchain.callbacks.base import BaseCallbackHandler from langchain_core.prompts import ChatPromptTemplate streaming = True session = boto3.session.Session() bedrock_client = session.client("bedrock-runtime", region_name="us-east-1") class MyCustomHandler(BaseCallbackHandler): def on_llm_new_token(self, token: str, **kwargs) -> None: print(f"My custom handler, token: {token}; Type of token: {type(token)}") prompt = ChatPromptTemplate.from_messages(["Tell me a joke about {animal}"]) model = ChatBedrock( client=bedrock_client, model_id="anthropic.claude-3-haiku-20240307-v1:0", region_name="us-east-1", streaming = streaming, callbacks=[MyCustomHandler()], ) chain = prompt | model response = chain.invoke({"animal": "bears"}) print(response) ``` ### Output: ``` (base) vishal@mypc aws % poetry run python3 streaming_demo.py My custom handler, token: ; Type of token: My custom handler, token: Here; Type of token: My custom handler, token: 's; Type of token: My custom handler, token: a; Type of token: My custom handler, token: bear; Type of token: My custom handler, token: joke; Type of token: My custom handler, token: for; Type of token: My custom handler, token: you; Type of token: My custom handler, token: :; Type of token: My custom handler, token: ... ... ... ``` ### Using `Chatbedrock` with `streaming` and `beta_use_converse_api` flags set as `True`: ```python import boto3 from langchain_aws import ChatBedrock, ChatBedrockConverse from langchain.callbacks.base import BaseCallbackHandler from langchain_core.prompts import ChatPromptTemplate streaming = True session = boto3.session.Session() bedrock_client = session.client("bedrock-runtime", region_name="us-east-1") class MyCustomHandler(BaseCallbackHandler): def on_llm_new_token(self, token: str, **kwargs) -> None: print(f"My custom handler, token: {token}; Type of token: {type(token)}") prompt = ChatPromptTemplate.from_messages(["Tell me a joke about {animal}"]) model = ChatBedrock( client=bedrock_client, model_id="anthropic.claude-3-haiku-20240307-v1:0", region_name="us-east-1", streaming = streaming, callbacks=[MyCustomHandler()], beta_use_converse_api = True ) chain = prompt | model response = chain.invoke({"animal": "bears"}) print(response) ``` ### Output: ``` (base) vishal@mypc aws % poetry run python3 streaming_demo.py My custom handler, token: ; Type of token: My custom handler, token: Here; Type of token: My custom handler, token: 's; Type of token: My custom handler, token: a; Type of token: My custom handler, token: bear; Type of token: My custom handler, token: joke; Type of token: My custom handler, token: for; Type of token: My custom handler, token: you; Type of token: My custom handler, token: :; Type of token: My custom handler, token: ... ... ... ``` #### Stilling figuring out the following: Using `ChatBedrock` with `stream()` method and the custom handler gives similar output. But the same with `ChatBedrockConverse` gives a different output as shown below for the given code: ```python import boto3 from langchain_aws import ChatBedrock, ChatBedrockConverse from langchain.callbacks.base import BaseCallbackHandler from langchain_core.prompts import ChatPromptTemplate streaming = True session = boto3.session.Session() bedrock_client = session.client("bedrock-runtime", region_name="us-east-1") class MyCustomHandler(BaseCallbackHandler): def on_llm_new_token(self, token: str, **kwargs) -> None: print(f"My custom handler, token: {token}; Type of token: {type(token)}") prompt = ChatPromptTemplate.from_messages(["Tell me a joke about {animal}"]) model = ChatBedrock( client=bedrock_client, model_id="anthropic.claude-3-haiku-20240307-v1:0", region_name="us-east-1", callbacks=[MyCustomHandler()], beta_use_converse_api = True ) chain = prompt | model for chunk in chain.stream({"animal": "bears"}): pass ``` ### Output: ``` My custom handler, token: []; Type of token: My custom handler, token: [{'type': 'text', 'text': 'Here', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': "'s", 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': ' a', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': ' bear', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': '-', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': 'y', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': ' goo', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': 'd joke', 'index': 0}]; Type of token: My custom handler, token: [{'type': 'text', 'text': ' for', 'index': 0}]; Type of token: ... ... ... ``` --------- Co-authored-by: Vishal Patil Co-authored-by: Michael Chin --- libs/aws/langchain_aws/chat_models/bedrock.py | 13 +++++++++---- .../langchain_aws/chat_models/bedrock_converse.py | 7 ++++++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/libs/aws/langchain_aws/chat_models/bedrock.py b/libs/aws/langchain_aws/chat_models/bedrock.py index ca9cfedd..43226299 100644 --- a/libs/aws/langchain_aws/chat_models/bedrock.py +++ b/libs/aws/langchain_aws/chat_models/bedrock.py @@ -526,10 +526,15 @@ def _generate( **kwargs: Any, ) -> ChatResult: if self.beta_use_converse_api: - return self._as_converse._generate( - messages, stop=stop, run_manager=run_manager, **kwargs - ) - + if not self.streaming: + return self._as_converse._generate( + messages, stop=stop, run_manager=run_manager, **kwargs + ) + else: + stream_iter = self._as_converse._stream( + messages, stop=stop, run_manager=run_manager, **kwargs + ) + return generate_from_stream(stream_iter) completion = "" llm_output: Dict[str, Any] = {} tool_calls: List[ToolCall] = [] diff --git a/libs/aws/langchain_aws/chat_models/bedrock_converse.py b/libs/aws/langchain_aws/chat_models/bedrock_converse.py index 239100d9..efbae713 100644 --- a/libs/aws/langchain_aws/chat_models/bedrock_converse.py +++ b/libs/aws/langchain_aws/chat_models/bedrock_converse.py @@ -542,7 +542,12 @@ def _stream( ) for event in response["stream"]: if message_chunk := _parse_stream_event(event): - yield ChatGenerationChunk(message=message_chunk) + generation_chunk = ChatGenerationChunk(message=message_chunk) + if run_manager: + run_manager.on_llm_new_token( + generation_chunk.text, chunk=generation_chunk + ) + yield generation_chunk # TODO: Add async support once there are async bedrock.converse methods.