You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
hey folks, noticed some weirdness where the response from OpenAI using streaming abruptly cuts off then gets regenerated (last tested on b349f18 but doesn't look like that changed in the meantime)
I've looked a bit through the logic, and I think the connection getting closed 'leaks' the previous messages by continuing the apply somehow, I repro'd (thanks to o3-mini based on a valid example from CF's AI gateway, hopefully it didn't botch the format) with the following:
client
# client.exsMix.install([:req,:jason,:langchain])aliasLangChain.ChatModels.ChatOpenAIaliasLangChain.MessagealiasLangChain.Chains.LLMChaindefmoduleTestClientdodefrundo# Create a ChatOpenAI struct with our fake endpoint and streaming enabled.llm=ChatOpenAI.new!(%{endpoint: "http://localhost:31337/v1/chat/completions",stream: true,model: "gpt-4o",frequency_penalty: 0,temperature: 1,callbacks: []})# Build an LLMChain with a simple user message "test"chain=%{llm: llm}|>LLMChain.new!()|>LLMChain.add_messages([Message.new_user!("test")])updated_chain=LLMChain.run(chain,mode: :while_needs_response){:ok,%LLMChain{last_message: %Message{content: content}}}=updated_chainIO.inspect(content,label: "CONTENT",printable_limit: :infinity)endendTestClient.run()
fake server closing connection
# fake_server.exsMix.install([:plug_cowboy,:jason,:plug])requireLoggerdefmoduleFakeOpenAIServerdousePlug.Routerplug(:match)plug(:dispatch)# Start an Agent to track call countsdefstart_agentdoAgent.start_link(fn->0end,name: __MODULE__.CallCounter)endpost"/v1/chat/completions"docall_count=Agent.get_and_update(__MODULE__.CallCounter,fncount->{count+1,count+1}end)Logger.info("Call count: #{call_count}"){:ok,body,conn}=Plug.Conn.read_body(conn)Logger.info("Request body: #{body}")conn=Plug.Conn.put_resp_content_type(conn,"application/json")conn=Plug.Conn.send_chunked(conn,200)# Define our fake streamed chunks.chunks=[%{"nonce"=>"nonce1","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>"It","role"=>"assistant"},"finish_reason"=>nil}]},%{"nonce"=>"nonce2","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>" looks"},"finish_reason"=>nil}]},%{"nonce"=>"nonce3","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>" like"},"finish_reason"=>nil}]},%{"nonce"=>"nonce4","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>" you're"},"finish_reason"=>nil}]},%{"nonce"=>"nonce5","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>" performing"},"finish_reason"=>nil}]},%{"nonce"=>"nonce6","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>" a"},"finish_reason"=>nil}]},%{"nonce"=>"nonce7","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>" test"},"finish_reason"=>nil}]},%{"nonce"=>"nonce8","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{"content"=>"."},"finish_reason"=>nil}]},# Final chunk: finish_reason set to "stop"%{"nonce"=>"nonce9","id"=>"chatcmpl-XYZ","object"=>"chat.completion.chunk","created"=>123,"model"=>"gpt-4o","choices"=>[%{"index"=>0,"delta"=>%{},"finish_reason"=>"stop"}]}]ifcall_count==1do# On the first call, send only half of the chunks then simulate a transport error.half=div(length(chunks),2)Enum.take(chunks,half)|>Enum.each(fnchunk->chunk_json=Jason.encode!(chunk)<>"\n\n"Plug.Conn.chunk(conn,chunk_json)Process.sleep(100)end)Logger.info("Simulating transport error after partial stream")raise"Simulated transport error"else# On subsequent calls, send the full stream.Enum.each(chunks,fnchunk->chunk_json=Jason.encode!(chunk)<>"\n\n"Plug.Conn.chunk(conn,chunk_json)Process.sleep(100)end)connendendmatch_dosend_resp(conn,404,"Not found")endendFakeOpenAIServer.start_agent()Plug.Cowboy.http(FakeOpenAIServer,[],port: 31337)Logger.info("Fake OpenAI server running on http://localhost:31337")Process.sleep(:infinity)
output:
❯ elixir client.exs
CONTENT: "It looks like you'reIt looks like you're performing a test."
I'm using mode: :while_needs_response, this might be happening since delta isn't reset to nil on the chain on retries maybe? 🤔
The text was updated successfully, but these errors were encountered:
Hi @ci! I've wondered about what the "right" way to handle this is. Like if I'm generating a response in an interactive UI, and it's going in a direction I don't want, I can cancel it. Should it keep the partially received message and close out the delta to be completed? Or should it nil it?
I think the 'nil' approach works better for many different use cases.
retrying when the transport is closed, just realized looking again it's because I'm running mode: :while_needs_response and it retries that.
I agree with you, I think nil makes more sense, especially for the :while_needs_response case (where 'canceling' shouldn't really 're-run' it since it would actually mean you don't "want" a response anymore 🤔)
hey folks, noticed some weirdness where the response from OpenAI using streaming abruptly cuts off then gets regenerated (last tested on b349f18 but doesn't look like that changed in the meantime)
I've looked a bit through the logic, and I think the connection getting closed 'leaks' the previous messages by continuing the apply somehow, I repro'd (thanks to o3-mini based on a valid example from CF's AI gateway, hopefully it didn't botch the format) with the following:
client
fake server closing connection
output:
I'm using
mode: :while_needs_response
, this might be happening sincedelta
isn't reset tonil
on the chain on retries maybe? 🤔The text was updated successfully, but these errors were encountered: