Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retried streamed responses (ChatOpenAI) leak content from failed, retried requests #247

Open
ci opened this issue Feb 5, 2025 · 2 comments

Comments

@ci
Copy link

ci commented Feb 5, 2025

hey folks, noticed some weirdness where the response from OpenAI using streaming abruptly cuts off then gets regenerated (last tested on b349f18 but doesn't look like that changed in the meantime)

I've looked a bit through the logic, and I think the connection getting closed 'leaks' the previous messages by continuing the apply somehow, I repro'd (thanks to o3-mini based on a valid example from CF's AI gateway, hopefully it didn't botch the format) with the following:

client
# client.exs
Mix.install([:req, :jason, :langchain])

alias LangChain.ChatModels.ChatOpenAI
alias LangChain.Message
alias LangChain.Chains.LLMChain

defmodule TestClient do
  def run do
    # Create a ChatOpenAI struct with our fake endpoint and streaming enabled.
    llm =
      ChatOpenAI.new!(%{
        endpoint: "http://localhost:31337/v1/chat/completions",
        stream: true,
        model: "gpt-4o",
        frequency_penalty: 0,
        temperature: 1,
        callbacks: []
      })

    # Build an LLMChain with a simple user message "test"
    chain =
      %{llm: llm}
      |> LLMChain.new!()
      |> LLMChain.add_messages([Message.new_user!("test")])

    updated_chain = LLMChain.run(chain, mode: :while_needs_response)

    {:ok, %LLMChain{last_message: %Message{content: content}}} = updated_chain

    IO.inspect(content, label: "CONTENT", printable_limit: :infinity)
  end
end

TestClient.run()
fake server closing connection
# fake_server.exs
Mix.install([:plug_cowboy, :jason, :plug])

require Logger

defmodule FakeOpenAIServer do
  use Plug.Router

  plug(:match)
  plug(:dispatch)

  # Start an Agent to track call counts
  def start_agent do
    Agent.start_link(fn -> 0 end, name: __MODULE__.CallCounter)
  end

  post "/v1/chat/completions" do
    call_count =
      Agent.get_and_update(__MODULE__.CallCounter, fn count ->
        {count + 1, count + 1}
      end)

    Logger.info("Call count: #{call_count}")

    {:ok, body, conn} = Plug.Conn.read_body(conn)
    Logger.info("Request body: #{body}")
    conn = Plug.Conn.put_resp_content_type(conn, "application/json")
    conn = Plug.Conn.send_chunked(conn, 200)

    # Define our fake streamed chunks.
    chunks = [
      %{
        "nonce" => "nonce1",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [
          %{
            "index" => 0,
            "delta" => %{"content" => "It", "role" => "assistant"},
            "finish_reason" => nil
          }
        ]
      },
      %{
        "nonce" => "nonce2",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [
          %{"index" => 0, "delta" => %{"content" => " looks"}, "finish_reason" => nil}
        ]
      },
      %{
        "nonce" => "nonce3",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [%{"index" => 0, "delta" => %{"content" => " like"}, "finish_reason" => nil}]
      },
      %{
        "nonce" => "nonce4",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [
          %{"index" => 0, "delta" => %{"content" => " you're"}, "finish_reason" => nil}
        ]
      },
      %{
        "nonce" => "nonce5",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [
          %{"index" => 0, "delta" => %{"content" => " performing"}, "finish_reason" => nil}
        ]
      },
      %{
        "nonce" => "nonce6",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [%{"index" => 0, "delta" => %{"content" => " a"}, "finish_reason" => nil}]
      },
      %{
        "nonce" => "nonce7",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [%{"index" => 0, "delta" => %{"content" => " test"}, "finish_reason" => nil}]
      },
      %{
        "nonce" => "nonce8",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [%{"index" => 0, "delta" => %{"content" => "."}, "finish_reason" => nil}]
      },
      # Final chunk: finish_reason set to "stop"
      %{
        "nonce" => "nonce9",
        "id" => "chatcmpl-XYZ",
        "object" => "chat.completion.chunk",
        "created" => 123,
        "model" => "gpt-4o",
        "choices" => [%{"index" => 0, "delta" => %{}, "finish_reason" => "stop"}]
      }
    ]

    if call_count == 1 do
      # On the first call, send only half of the chunks then simulate a transport error.
      half = div(length(chunks), 2)

      Enum.take(chunks, half)
      |> Enum.each(fn chunk ->
        chunk_json = Jason.encode!(chunk) <> "\n\n"

        Plug.Conn.chunk(conn, chunk_json)

        Process.sleep(100)
      end)

      Logger.info("Simulating transport error after partial stream")
      raise "Simulated transport error"
    else
      # On subsequent calls, send the full stream.
      Enum.each(chunks, fn chunk ->
        chunk_json = Jason.encode!(chunk) <> "\n\n"

        Plug.Conn.chunk(conn, chunk_json)

        Process.sleep(100)
      end)

      conn
    end
  end

  match _ do
    send_resp(conn, 404, "Not found")
  end
end

FakeOpenAIServer.start_agent()
Plug.Cowboy.http(FakeOpenAIServer, [], port: 31337)
Logger.info("Fake OpenAI server running on http://localhost:31337")
Process.sleep(:infinity)

output:

❯ elixir client.exs
CONTENT: "It looks like you'reIt looks like you're performing a test."

I'm using mode: :while_needs_response, this might be happening since delta isn't reset to nil on the chain on retries maybe? 🤔

@brainlid
Copy link
Owner

brainlid commented Feb 6, 2025

Hi @ci! I've wondered about what the "right" way to handle this is. Like if I'm generating a response in an interactive UI, and it's going in a direction I don't want, I can cancel it. Should it keep the partially received message and close out the delta to be completed? Or should it nil it?

I think the 'nil' approach works better for many different use cases.

@ci
Copy link
Author

ci commented Feb 11, 2025

ahhh my bad @brainlid - I initially thought it was something like

{:error, %Req.TransportError{reason: :closed}} ->
# Force a retry by making a recursive call decrementing the counter
Logger.debug(fn -> "Mint connection closed: retry count = #{inspect(retry_count)}" end)
do_api_request(openai, messages, tools, retry_count - 1)
retrying when the transport is closed, just realized looking again it's because I'm running mode: :while_needs_response and it retries that.

I agree with you, I think nil makes more sense, especially for the :while_needs_response case (where 'canceling' shouldn't really 're-run' it since it would actually mean you don't "want" a response anymore 🤔)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants