Skip to content

Commit

Permalink
support receiving rate limit info (#133)
Browse files Browse the repository at this point in the history
* support receiving ratelimit info
- LLM callbacks added
- Anthropic support added
- OpenAI support added

* updated changelog
  • Loading branch information
brainlid authored Jun 8, 2024
1 parent ceee77d commit 1b8eb20
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* New run mode `:until_success` uses failure and retry counts to repeatedly run the chain when the LLMs responses fail a MessageProcessor.
* `LangChain.MessageProcessors.JsonProcessor` is capable of extracting JSON contents and converting it to an Elixir map using `Jason`. Parsing errors are returned to the LLM for it to try again.
* The attribute `processed_content` was added to a `LangChain.Message`. When a MessageProcessor is run on a received assistant message, the results of the processing are accumulated there. The original `content` remains unchanged for when it is sent back to the LLM and used when fixing or correcting it's generated content.
* Callback support for LLM ratelimit information returned in API response headers. These are currently implemented for Anthropic and OpenAI.

**Changed:**

Expand Down
65 changes: 59 additions & 6 deletions lib/chat_models/chat_anthropic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,33 @@ defmodule LangChain.ChatModels.ChatAnthropic do
## Callbacks
See the set of available callback: `LangChain.ChatModels.LLMCallbacks`
### Rate Limit API Response Headers
Anthropic returns rate limit information in the response headers. Those can be
accessed using an LLM callback like this:
handlers = %{
on_llm_ratelimit_info: fn _model, headers ->
IO.inspect(headers)
end
}
{:ok, chat} = ChatAnthropic.new(%{callbacks: [handlers]})
When a request is received, something similar to the following will be output
to the console.
%{
"anthropic-ratelimit-requests-limit" => ["50"],
"anthropic-ratelimit-requests-remaining" => ["49"],
"anthropic-ratelimit-requests-reset" => ["2024-06-08T04:28:30Z"],
"anthropic-ratelimit-tokens-limit" => ["50000"],
"anthropic-ratelimit-tokens-remaining" => ["50000"],
"anthropic-ratelimit-tokens-reset" => ["2024-06-08T04:28:30Z"],
"request-id" => ["req_1234"]
}
"""
use Ecto.Schema
require Logger
Expand Down Expand Up @@ -306,7 +333,12 @@ defmodule LangChain.ChatModels.ChatAnthropic do
|> Req.post()
# parse the body and return it as parsed structs
|> case do
{:ok, %Req.Response{body: data}} ->
{:ok, %Req.Response{body: data} = response} ->
Callbacks.fire(anthropic.callbacks, :on_llm_ratelimit_info, [
anthropic,
get_ratelimit_info(response.headers)
])

case do_process_response(data) do
{:error, reason} ->
{:error, reason}
Expand Down Expand Up @@ -342,12 +374,14 @@ defmodule LangChain.ChatModels.ChatAnthropic do
headers: headers(get_api_key(anthropic), anthropic.api_version),
receive_timeout: anthropic.receive_timeout
)
|> Req.post(
into:
Utils.handle_stream_fn(anthropic, &decode_stream/1, &do_process_response/1)
)
|> Req.post(into: Utils.handle_stream_fn(anthropic, &decode_stream/1, &do_process_response/1))
|> case do
{:ok, %Req.Response{body: data}} ->
{:ok, %Req.Response{body: data} = response} ->
Callbacks.fire(anthropic.callbacks, :on_llm_ratelimit_info, [
anthropic,
get_ratelimit_info(response.headers)
])

data

{:error, %LangChainError{message: reason}} ->
Expand Down Expand Up @@ -782,4 +816,23 @@ defmodule LangChain.ChatModels.ChatAnthropic do
when is_list(content) do
item
end

defp get_ratelimit_info(response_headers) do
# extract out all the ratelimit response headers
#
# https://docs.anthropic.com/en/api/rate-limits#response-headers
{return, _} =
Map.split(response_headers, [
"anthropic-ratelimit-requests-limit",
"anthropic-ratelimit-requests-remaining",
"anthropic-ratelimit-requests-reset",
"anthropic-ratelimit-tokens-limit",
"anthropic-ratelimit-tokens-remaining",
"anthropic-ratelimit-tokens-reset",
"retry-after",
"request-id"
])

return
end
end
66 changes: 60 additions & 6 deletions lib/chat_models/chat_open_ai.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule LangChain.ChatModels.ChatOpenAI do
@moduledoc """
Represents the [OpenAI ChatModel](https://platform.openai.com/docs/api-reference/chat/create).
Represents the [OpenAI
ChatModel](https://platform.openai.com/docs/api-reference/chat/create).
Parses and validates inputs for making a requests from the OpenAI Chat API.
Expand All @@ -12,6 +13,33 @@ defmodule LangChain.ChatModels.ChatOpenAI do
## Callbacks
See the set of available callback: `LangChain.ChatModels.LLMCallbacks`
### Rate Limit API Response Headers
OpenAI returns rate limit information in the response headers. Those can be
accessed using an LLM callback like this:
handlers = %{
on_llm_ratelimit_info: fn _model, headers ->
IO.inspect(headers)
end
}
{:ok, chat} = ChatOpenAI.new(%{callbacks: [handlers]})
When a request is received, something similar to the following will be output
to the console.
%{
"x-ratelimit-limit-requests" => ["5000"],
"x-ratelimit-limit-tokens" => ["160000"],
"x-ratelimit-remaining-requests" => ["4999"],
"x-ratelimit-remaining-tokens" => ["159973"],
"x-ratelimit-reset-requests" => ["12ms"],
"x-ratelimit-reset-tokens" => ["10ms"],
"x-request-id" => ["req_1234"]
}
"""
use Ecto.Schema
require Logger
Expand Down Expand Up @@ -448,7 +476,12 @@ defmodule LangChain.ChatModels.ChatOpenAI do
|> Req.post()
# parse the body and return it as parsed structs
|> case do
{:ok, %Req.Response{body: data}} ->
{:ok, %Req.Response{body: data} = response} ->
Callbacks.fire(openai.callbacks, :on_llm_ratelimit_info, [
openai,
get_ratelimit_info(response.headers)
])

case do_process_response(data) do
{:error, reason} ->
{:error, reason}
Expand Down Expand Up @@ -490,11 +523,14 @@ defmodule LangChain.ChatModels.ChatOpenAI do
receive_timeout: openai.receive_timeout
)
|> maybe_add_org_id_header()
|> Req.post(
into: Utils.handle_stream_fn(openai, &decode_stream/1, &do_process_response/1)
)
|> Req.post(into: Utils.handle_stream_fn(openai, &decode_stream/1, &do_process_response/1))
|> case do
{:ok, %Req.Response{body: data}} ->
{:ok, %Req.Response{body: data} = response} ->
Callbacks.fire(openai.callbacks, :on_llm_ratelimit_info, [
openai,
get_ratelimit_info(response.headers)
])

data

{:error, %LangChainError{message: reason}} ->
Expand Down Expand Up @@ -743,4 +779,22 @@ defmodule LangChain.ChatModels.ChatOpenAI do
req
end
end

defp get_ratelimit_info(response_headers) do
# extract out all the ratelimit response headers
#
# https://platform.openai.com/docs/guides/rate-limits/rate-limits-in-headers
{return, _} =
Map.split(response_headers, [
"x-ratelimit-limit-requests",
"x-ratelimit-limit-tokens",
"x-ratelimit-remaining-requests",
"x-ratelimit-remaining-tokens",
"x-ratelimit-reset-requests",
"x-ratelimit-reset-tokens",
"x-request-id"
])

return
end
end
22 changes: 21 additions & 1 deletion lib/chat_models/llm_callbacks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,31 @@ defmodule LangChain.ChatModels.LLMCallbacks do
"""
@type llm_new_message :: (model :: struct(), Message.t() -> any())

@typedoc """
Executed when an LLM (typically a service) responds with rate limiting
information.
The specific rate limit information depends on the LLM. It returns a map with
all the available information included.
The return value is discarded.
## Example
A function declaration that matches the signature.
def handle_llm_ratelimit_info(_chat_model, info) do
IO.inspect(info)
end
"""
@type llm_ratelimit_info :: (model :: struct(), info :: %{String.t() => any()} -> any())

@typedoc """
The supported set of callbacks for an LLM module.
"""
@type llm_callback_handler :: %{
on_llm_new_delta: llm_new_delta(),
on_llm_new_message: llm_new_message()
on_llm_new_message: llm_new_message(),
on_llm_ratelimit_info: llm_ratelimit_info()
}
end
47 changes: 43 additions & 4 deletions test/chat_models/chat_anthropic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,14 @@ defmodule LangChain.ChatModels.ChatAnthropicTest do
end

@tag live_call: true, live_anthropic: true
test "basic streamed content example" do
{:ok, chat} = ChatAnthropic.new(%{stream: true})
test "basic streamed content example and fires ratelimit callback" do
handlers = %{
on_llm_ratelimit_info: fn _model, headers ->
send(self(), {:fired_ratelimit_info, headers})
end
}

{:ok, chat} = ChatAnthropic.new(%{stream: true, callbacks: [handlers]})

{:ok, result} =
ChatAnthropic.call(chat, [
Expand Down Expand Up @@ -427,6 +433,20 @@ defmodule LangChain.ChatModels.ChatAnthropicTest do
role: :assistant
}
]

assert_received {:fired_ratelimit_info, info}

assert %{
"anthropic-ratelimit-requests-limit" => _,
"anthropic-ratelimit-requests-remaining" => _,
"anthropic-ratelimit-requests-reset" => _,
"anthropic-ratelimit-tokens-limit" => _,
"anthropic-ratelimit-tokens-remaining" => _,
"anthropic-ratelimit-tokens-reset" => _,
# Not always included
# "retry-after" => _,
"request-id" => _
} = info
end
end

Expand Down Expand Up @@ -1147,12 +1167,15 @@ data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text
end

@tag live_call: true, live_anthropic: true
test "works with NON streaming response" do
test "works with NON streaming response and fires ratelimit callback" do
test_pid = self()

handler = %{
on_llm_new_message: fn _model, message ->
send(test_pid, {:received_msg, message})
end,
on_llm_ratelimit_info: fn _model, headers ->
send(test_pid, {:fired_ratelimit_info, headers})
end
}

Expand All @@ -1167,6 +1190,20 @@ data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text

assert_received {:received_msg, data}
assert %Message{role: :assistant} = data

assert_received {:fired_ratelimit_info, info}

assert %{
"anthropic-ratelimit-requests-limit" => _,
"anthropic-ratelimit-requests-remaining" => _,
"anthropic-ratelimit-requests-reset" => _,
"anthropic-ratelimit-tokens-limit" => _,
"anthropic-ratelimit-tokens-remaining" => _,
"anthropic-ratelimit-tokens-reset" => _,
# Not always included
# "retry-after" => _,
"request-id" => _
} = info
end

@tag live_call: true, live_anthropic: true
Expand All @@ -1181,7 +1218,9 @@ data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text
}

{:ok, _result_chain, last_message} =
LLMChain.new!(%{llm: %ChatAnthropic{model: @test_model, stream: true, callbacks: [handler]}})
LLMChain.new!(%{
llm: %ChatAnthropic{model: @test_model, stream: true, callbacks: [handler]}
})
|> LLMChain.add_message(Message.new_system!("You are a helpful and concise assistant."))
|> LLMChain.add_message(Message.new_user!("Say, 'Hi!'!"))
|> LLMChain.add_message(Message.new_assistant!("Hi!"))
Expand Down
Loading

0 comments on commit 1b8eb20

Please sign in to comment.