Skip to content

Commit

Permalink
[Fix] streaming chatbedrock converseapi (#351)
Browse files Browse the repository at this point in the history
Added run_manager in stream method of ChatBedrockConverse. Added
streaming flag handling for to enable converse streaming with
ChatBedrock
 
 Fixes #334 

### Using `Chatbedrock` with `streaming` flag set `True`:

```python
import boto3
from langchain_aws import ChatBedrock, ChatBedrockConverse
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.prompts import ChatPromptTemplate

streaming = True
session = boto3.session.Session()
bedrock_client = session.client("bedrock-runtime", region_name="us-east-1")

class MyCustomHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"My custom handler, token: {token}; Type of token: {type(token)}")

prompt = ChatPromptTemplate.from_messages(["Tell me a joke about {animal}"])

model = ChatBedrock(
    client=bedrock_client, 
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    region_name="us-east-1",
    streaming = streaming,
    callbacks=[MyCustomHandler()],
)

chain = prompt | model

response = chain.invoke({"animal": "bears"}) 
print(response)
```


### Output:

```
(base) vishal@mypc aws % poetry run python3 streaming_demo.py
My custom handler, token: ; Type of token: <class 'str'>
My custom handler, token: Here; Type of token: <class 'str'>
My custom handler, token: 's; Type of token: <class 'str'>
My custom handler, token:  a; Type of token: <class 'str'>
My custom handler, token:  bear; Type of token: <class 'str'>
My custom handler, token:  joke; Type of token: <class 'str'>
My custom handler, token:  for; Type of token: <class 'str'>
My custom handler, token:  you; Type of token: <class 'str'>
My custom handler, token: :; Type of token: <class 'str'>
My custom handler, token: 
...
...
...
```


### Using `Chatbedrock` with `streaming` and `beta_use_converse_api`
flags set as `True`:

```python
import boto3
from langchain_aws import ChatBedrock, ChatBedrockConverse
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.prompts import ChatPromptTemplate

streaming = True
session = boto3.session.Session()
bedrock_client = session.client("bedrock-runtime", region_name="us-east-1")

class MyCustomHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"My custom handler, token: {token}; Type of token: {type(token)}")

prompt = ChatPromptTemplate.from_messages(["Tell me a joke about {animal}"])

model = ChatBedrock(
    client=bedrock_client, 
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    region_name="us-east-1",
    streaming = streaming,
    callbacks=[MyCustomHandler()],
    beta_use_converse_api = True
)

chain = prompt | model

response = chain.invoke({"animal": "bears"}) 
print(response)
```

### Output:

```
(base) vishal@mypc aws % poetry run python3 streaming_demo.py
My custom handler, token: ; Type of token: <class 'str'>
My custom handler, token: Here; Type of token: <class 'str'>
My custom handler, token: 's; Type of token: <class 'str'>
My custom handler, token:  a; Type of token: <class 'str'>
My custom handler, token:  bear; Type of token: <class 'str'>
My custom handler, token:  joke; Type of token: <class 'str'>
My custom handler, token:  for; Type of token: <class 'str'>
My custom handler, token:  you; Type of token: <class 'str'>
My custom handler, token: :; Type of token: <class 'str'>
My custom handler, token: 
...
...
...

```



#### Stilling figuring out the following:

Using `ChatBedrock` with `stream()` method and the custom handler gives
similar output. But the same with `ChatBedrockConverse` gives a
different output as shown below for the given code:

```python
import boto3
from langchain_aws import ChatBedrock, ChatBedrockConverse
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.prompts import ChatPromptTemplate

streaming = True
session = boto3.session.Session()
bedrock_client = session.client("bedrock-runtime", region_name="us-east-1")

class MyCustomHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"My custom handler, token: {token}; Type of token: {type(token)}")

prompt = ChatPromptTemplate.from_messages(["Tell me a joke about {animal}"])

model = ChatBedrock(
    client=bedrock_client, 
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    region_name="us-east-1",
    callbacks=[MyCustomHandler()],
    beta_use_converse_api = True
)

chain = prompt | model

for chunk in chain.stream({"animal": "bears"}):
    pass
```

### Output:

```
My custom handler, token: []; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': 'Here', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': "'s", 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': ' a', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': ' bear', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': '-', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': 'y', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': ' goo', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': 'd joke', 'index': 0}]; Type of token: <class 'list'>
My custom handler, token: [{'type': 'text', 'text': ' for', 'index': 0}]; Type of token: <class 'list'>
...
...
...

```

---------

Co-authored-by: Vishal Patil <[email protected]>
Co-authored-by: Michael Chin <[email protected]>
  • Loading branch information
3 people authored Feb 13, 2025
1 parent eb2ef6a commit 6d95e09
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
13 changes: 9 additions & 4 deletions libs/aws/langchain_aws/chat_models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,15 @@ def _generate(
**kwargs: Any,
) -> ChatResult:
if self.beta_use_converse_api:
return self._as_converse._generate(
messages, stop=stop, run_manager=run_manager, **kwargs
)

if not self.streaming:
return self._as_converse._generate(
messages, stop=stop, run_manager=run_manager, **kwargs
)
else:
stream_iter = self._as_converse._stream(
messages, stop=stop, run_manager=run_manager, **kwargs
)
return generate_from_stream(stream_iter)
completion = ""
llm_output: Dict[str, Any] = {}
tool_calls: List[ToolCall] = []
Expand Down
7 changes: 6 additions & 1 deletion libs/aws/langchain_aws/chat_models/bedrock_converse.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,12 @@ def _stream(
)
for event in response["stream"]:
if message_chunk := _parse_stream_event(event):
yield ChatGenerationChunk(message=message_chunk)
generation_chunk = ChatGenerationChunk(message=message_chunk)
if run_manager:
run_manager.on_llm_new_token(
generation_chunk.text, chunk=generation_chunk
)
yield generation_chunk

# TODO: Add async support once there are async bedrock.converse methods.

Expand Down

0 comments on commit 6d95e09

Please sign in to comment.