Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling non-JSON payloads with asDecodedServerSentEventsWithJSONData() #622

Closed
paulhdk opened this issue Sep 12, 2024 · 14 comments · Fixed by #625
Closed

Handling non-JSON payloads with asDecodedServerSentEventsWithJSONData() #622

paulhdk opened this issue Sep 12, 2024 · 14 comments · Fixed by #625
Labels
kind/support Adopter support requests. status/triage Collecting information required to triage the issue.

Comments

@paulhdk
Copy link
Contributor

paulhdk commented Sep 12, 2024

Question

Hi!

I'm generating code with the OpenAI OpenAPI specification and am iterating over streamed responses from the chat/completions endpoint with asDecodedServerSentEventsWithJSONData().

The problem is that API returns ChatCompletionStreamResponse JSON payloads until the stream is done, which is denoted by a simple [DONE] payload.

Because [DONE] isn't valid JSON, the last chunk of a successful stream will throw a DecodingError.dataCorrupted().

Currently, I'm just handling it with an empty catch block to avoid the crash, but that seems like a hacky workaround. So, I was wondering whether there was a better solution.

The following ideas haven't been successful so far:

  • Use a ClientMiddleware to match incoming payloads for the [DONE] bytes and toss the payload if they're found.
  • Add another schema to the OpenAPI spec that defines [DONE] and is connect with the ChatCompletionStreamResponse through an anyOf.

Do you have any suggestions?

@paulhdk paulhdk added kind/support Adopter support requests. status/triage Collecting information required to triage the issue. labels Sep 12, 2024
@simonjbeaumont
Copy link
Collaborator

simonjbeaumont commented Sep 12, 2024

I've just downloaded the latest OpenAPI document for OpenAI, and I think we might need some changes. It currently provides application/json as the content-type regardless of whether the streaming parameter is set or not:

            responses:
                "200":
                    description: OK
                    content:
                        application/json:
                            schema:
                                $ref: "#/components/schemas/CreateChatCompletionResponse"

It returns the following body:

data: {"id":"chatcmpl-A6aNCrs7LgMTsQlex5k1tdPIDfPfg","object":"chat.completion.chunk","created":1726133150,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_483d39d857","choices":[{"index":0,"delta"
:{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-A6aNCrs7LgMTsQlex5k1tdPIDfPfg","object":"chat.completion.chunk","created":1726133150,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_483d39d857","choices":[{"index":0,"delta":{"con
tent":"Yes"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-A6aNCrs7LgMTsQlex5k1tdPIDfPfg","object":"chat.completion.chunk","created":1726133150,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_483d39d857","choices":[{"index":0,"delta":{"con
tent":","},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-A6aNCrs7LgMTsQlex5k1tdPIDfPfg","object":"chat.completion.chunk","created":1726133150,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_483d39d857","choices":[{"index":0,"delta":{"con
tent":" here"},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-A6aNCrs7LgMTsQlex5k1tdPIDfPfg","object":"chat.completion.chunk","created":1726133150,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_483d39d857","choices":[{"index":0,"delta":{"con
tent":"."},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-A6aNCrs7LgMTsQlex5k1tdPIDfPfg","object":"chat.completion.chunk","created":1726133150,"model":"gpt-4o-mini-2024-07-18","system_fingerprint":"fp_483d39d857","choices":[{"index":0,"delta":{},"l
ogprobs":null,"finish_reason":"stop"}]}

data: [DONE]

The problem is that our streaming helpers in OpenAPI runtime operate as extensions on AsyncSequence<ArraySlice<UInt8>>, which means they're only available when the generated code is dealing with a binary response body.

This works fine when the documented content type is text/event-stream but with application/json we don't have access to these helpers.

This appears to be an OpenAPI doc issue because it is actually returning a text/event-stream, which trips the runtime validation in any event.

Swift/ErrorType.swift:253: Fatal error: Error raised at top level: Client error - cause description: 'Unexpected content type, expected: application/json, received: text/event-stream; charset=utf-8', underlying error: Unexpected content type, expected: application/json, received: text/event-stream; ...

Let me play with this a bit more and report back, it might be we just need the doc to be patched. ISTR this used to work :)

@simonjbeaumont
Copy link
Collaborator

simonjbeaumont commented Sep 12, 2024

OK, so I think there's potentially a combination of issues here:

  1. The OpenAPI document for this API specifies application/json as the content-type but will actually send text/event-stream based on a boolean parameter.
  2. The server is sending a [DONE] at the end of the SSE stream, which doesn't decode using the asDecodedServerSentEventsWithJSONData helper.

(1) will need to be addressed in the upstream OpenAPI doc, by just adding another content-type to the response, which I have done locally. In the meantime, you might be able to swizzle the content-type in a client middleware.

--- a/openapi.yaml
+++ b/openapi.yaml
@@ -61,6 +61,9 @@ paths:
                         application/json:
                             schema:
                                 $ref: "#/components/schemas/CreateChatCompletionResponse"
+                        text/event-stream:
+                            schema:
+                                $ref: "#/components/schemas/CreateChatCompletionResponse"

             x-oaiMeta:
                 name: Create chat completion

Once (1) is addressed, you immediately run into (2), where it will happily decode all the events but will get stuck on the last event, [DONE]:

Swift.DecodingError.dataCorrupted(Swift.DecodingError.Context(codingPath: [], debugDescription: "The given data was not valid JSON.", underlyingError: Optional(Error Domain=NSCocoaErrorDomain Code=3840 "Unexpected character 'D' around line 1, column 2." UserInfo={NSDebugDescription=Unexpected character 'D' around line 1, column 2., NSJSONSerializationErrorIndex=1})))

I have reproduced this in a mock client now and the workaround is to decode it using asDecodedServerSentEvents, filter out the [DONE] event, re-encode, and then decode again as the expected type:

// Replace this...
let responses = try response.ok.body.text_event_hyphen_stream
    .asDecodedServerSentEventsWithJSONData(of: Components.Schemas.CreateChatCompletionStreamResponse.self)

```swift
// ...with this...
let responses = try response.ok.body.text_event_hyphen_stream
    .asDecodedServerSentEvents()
    .filter { $0.data != "[DONE]" }
    .asEncodedServerSentEvents()
    .asDecodedServerSentEventsWithJSONData(of:
        Components.Schemas.CreateChatCompletionStreamResponse.self)

The re-encode/decode step will be unnecessary cost, and you could just encode with a map but the above will mean you don't have to bother with providing your own appropriately configured JSONDecoder.

I'll take a note to see if this [DONE] should work out-of-the box.

@paulhdk
Copy link
Contributor Author

paulhdk commented Sep 12, 2024

OK, so I think there's potentially a combination of issues here:

  1. The OpenAPI document for this API specifies application/json as the content-type but will actually send text/event-stream based on a boolean parameter.

@czechboy0 and I had actually worked that one out already in #614. I submitted a PR to the OpenAI OpenAPI repo, but haven't heard anything yet. Should've mentioned that in my initial issue description ...

  1. The server is sending a [DONE] at the end of the SSE stream, which doesn't decode using the asDecodedServerSentEventsWithJSONData helper.

Yes, that's exactly the problem I'm facing.

(1) will need to be addressed in the upstream OpenAPI doc, by just adding another content-type to the response, which I have done locally. In the meantime, you might be able to swizzle the content-type in a client middleware.

--- a/openapi.yaml
+++ b/openapi.yaml
@@ -61,6 +61,9 @@ paths:
                         application/json:
                             schema:
                                 $ref: "#/components/schemas/CreateChatCompletionResponse"
+                        text/event-stream:
+                            schema:
+                                $ref: "#/components/schemas/CreateChatCompletionResponse"

             x-oaiMeta:
                 name: Create chat completion

Once (1) is addressed, you immediately run into (2), where it will happily decode all the events but will get stuck on the last event, [DONE]:

Swift.DecodingError.dataCorrupted(Swift.DecodingError.Context(codingPath: [], debugDescription: "The given data was not valid JSON.", underlyingError: Optional(Error Domain=NSCocoaErrorDomain Code=3840 "Unexpected character 'D' around line 1, column 2." UserInfo={NSDebugDescription=Unexpected character 'D' around line 1, column 2., NSJSONSerializationErrorIndex=1})))

I have reproduced this in a mock client now and the workaround is to decode it using asDecodedServerSentEvents, filter out the [DONE] event, re-encode, and then decode again as the expected type:

// Replace this...
let responses = try response.ok.body.text_event_hyphen_stream
    .asDecodedServerSentEventsWithJSONData(of: Components.Schemas.CreateChatCompletionStreamResponse.self)

```swift
// ...with this...
let responses = try response.ok.body.text_event_hyphen_stream
    .asDecodedServerSentEvents()
    .filter { $0.data != "[DONE]" }
    .asEncodedServerSentEvents()
    .asDecodedServerSentEventsWithJSONData(of:
        Components.Schemas.CreateChatCompletionStreamResponse.self)

Brilliant! That works.
Again, thank you both for your quick and helpful responses.

The re-encode/decode step will be unnecessary cost, and you could just encode with a map but the above will mean you don't have to bother with providing your own appropriately configured JSONDecoder.

I'll take a note to see if this [DONE] should work out-of-the box.

I'll go ahead and close the issue though? Or would you prefer to keep it open?

@simonjbeaumont
Copy link
Collaborator

simonjbeaumont commented Sep 12, 2024

@czechboy0 and I had actually worked that one out already ... Should've mentioned that in my initial issue description

Ah, well. Now I've been on the voyage of discovery too 😅

Brilliant! That works. Again, thank you both for your quick and helpful responses.

You're very welcome—glad to be of help 👍

The re-encode/decode step will be unnecessary cost, and you could just encode with a map but the above will mean you don't have to bother with providing your own appropriately configured JSONDecoder.
I'll take a note to see if this [DONE] should work out-of-the box.

I'll go ahead and close the issue though? Or would you prefer to keep it open?

Leave it open for now as a reminder for me. Tomorrow I'll close it and open one with a focussed description to investigate what, if anything, we need to do about the [DONE], in general.

@simonjbeaumont
Copy link
Collaborator

simonjbeaumont commented Sep 13, 2024

So I thought it would be fun to have a chat with ChatGPT itself about the API choice to terminate an SSE stream where all the payloads are JSON with a non-JSON value. It acknowledges that this is both unconventional and not very helpful for processing pipelines that are decoding the event stream :)

This was after I did my own research and came to the following conclusions:

  1. WHATWG HTML Living Standard, 9.2 Server-sent events doesn't specify anything about how to terminate the stream.
  2. Many APIs that offer SSE terminate the stream by closing the connection.
  3. Looks like this [DONE] thing has also been adopted by Anthropic, but outside of the OpenAI and Anthropic APIs, I can't see any other use of this.

If there were wider precedent for it, I think we could adapt the helpers in some way.

The only thing I can think of currently is to add an optional parameter to asDecodedServerSentEventsWithJSONData(of:), to specify a known terminal message, which would allow you to call it like this:

.asDecodedServerSentEventsWithJSONData(of: ExpectedType.self, streamTerminalData: "[DONE]")

Better yet, maybe a boolean closure parameter.

I'll park this here and see if more data points come up.

@paulhdk
Copy link
Contributor Author

paulhdk commented Sep 13, 2024

So I thought it would be fun to have a chat with ChatGPT itself about the API choice to terminate an SSE stream where all the payloads are JSON with a non-JSON value. It acknowledges that this is both unconventional and not very helpful for processing pipelines that are decoding the event stream :)

Well, at least we’re not the only ones who think so then … :-)

This was after I did my own research and came to the following conclusions:

  1. WHATWG HTML Living Standard, 9.2 Server-sent events doesn't specify anything about how to terminate the stream.
  2. Many APIs that offer SSE terminate the stream by closing the connection.
  3. Looks like this [DONE] thing has also been adopted by Anthropic, but outside of the OpenAI and Anthropic APIs, I can't see any other use of this.

Very interesting!

If there were wider precedent for it, I think we could adapt the helpers in some way.

The only thing I can think of currently is to add an optional parameter to asDecodedServerSentEventsWithJSONData(of:), to specify a known terminal message, which would allow you to call it like this:

.asDecodedServerSentEventsWithJSONData(of: ExpectedType.self, streamTerminalData: "[DONE]")

Better yet, maybe a boolean closure parameter.

Would you mind me giving a PR a go? Or wouldn’t you merge it anyway until there’s more precedent.

I'll park this here and see if more data points come up.

Thank you for doing some more digging on this!

@simonjbeaumont
Copy link
Collaborator

If there were wider precedent for it, I think we could adapt the helpers in some way.
The only thing I can think of currently is to add an optional parameter to asDecodedServerSentEventsWithJSONData(of:), to specify a known terminal message, which would allow you to call it like this:

.asDecodedServerSentEventsWithJSONData(of: ExpectedType.self, streamTerminalData: "[DONE]")

Better yet, maybe a boolean closure parameter.

Would you mind me giving a PR a go? Or wouldn’t you merge it anyway until there’s more precedent?

I'm pretty open to that change, so long as it's API-backwards compatible and opt-in, and we're always grateful for folks willing to contribute! 🙏

What do you think @czechboy0?

We should probably discuss the shape of the parameter before you put any time into the coding though. There are at least a few options for it, e.g.:

  1. Is the argument used as filter or a terminator?
  2. Is the argument provided as a value to match against, or a predicate closure?

@czechboy0
Copy link
Collaborator

I also like the idea of customizing the terminating byte sequence. I think it should be a closure that takes an ArraySlice<UInt8> and returns Bool, represent a terminal sequence (meaning we actually close the sequence and always return nil afterwards). We can then have one more variant that takes ArraySlice<UInt8> directly as a convenience, that calls into the previous one.

@simonjbeaumont
Copy link
Collaborator

@czechboy0 wrote

I think it should be a closure that takes an ArraySlice<UInt8> and returns Bool...

Yeah, that'll do.

represent a terminal sequence (meaning we actually close the sequence and always return nil afterwards)

My lean, too. I think this is much more opinionated and useful than a filter.

Probably want one of these for both asDecodedServerSentEvents and asDecodedServerSentEventsWithJSONData, for symmetry.

@paulhdk wrote

Would you mind me giving a PR a go?

Wanna take a stab at it?

@czechboy0
Copy link
Collaborator

Probably want one of these for both asDecodedServerSentEvents and asDecodedServerSentEventsWithJSONData, for symmetry.

Agreed.

I do not think we need these for JSON Sequence and JSON Lines, as those require the content to be JSON (unlike SSE). Does anyone disagree?

@simonjbeaumont
Copy link
Collaborator

I do not think we need these for JSON Sequence and JSON Lines, as those require the content to be JSON (unlike SSE). Does anyone disagree?

Agreed.

@paulhdk
Copy link
Contributor Author

paulhdk commented Sep 16, 2024

@paulhdk wrote

Would you mind me giving a PR a go?

Wanna take a stab at it?

Sure! I'd love to give at shot.

Thank you both for the design details!

@paulhdk paulhdk changed the title Handling non-JSON payloads with asDecodedServerSentEventsWithJSONData() EventStreams: Handling non-JSON payloads with asDecodedServerSentEventsWithJSONData() Sep 17, 2024
@paulhdk paulhdk changed the title EventStreams: Handling non-JSON payloads with asDecodedServerSentEventsWithJSONData() Handling non-JSON payloads with asDecodedServerSentEventsWithJSONData() Sep 17, 2024
@paulhdk
Copy link
Contributor Author

paulhdk commented Sep 17, 2024

Here we go. Hope these are along the lines of what you had in mind:

czechboy0 added a commit to apple/swift-openapi-runtime that referenced this issue Oct 3, 2024
### Motivation

As discussed in
apple/swift-openapi-generator#622, some APIs,
e.g., ChatGPT or Claude, may return a non-JSON byte sequence to
terminate a stream of events.
If not handled with a workaround (see below)such non-JSON terminating
byte sequences cause a decoding error.

### Modifications

This PR adds the ability to customise the terminating byte sequence by
providing a closure to `asDecodedServerSentEvents()` as well as
`asDecodedServerSentEventsWithJSONData()` that can match incoming data
for the terminating byte sequence before it is decoded into JSON, for
instance.

### Result

Instead of having to decode and re-encode incoming events to filter out
the terminating byte sequence - as seen in
apple/swift-openapi-generator#622 (comment)
- terminating byte sequences can now be cleanly caught by either
providing a closure or providing the terminating byte sequence directly
when calling `asDecodedServerSentEvents()` and
`asDecodedServerSentEventsWithJSONData()`.

### Test Plan

This PR includes unit tests that test the new function parameters as
part of the existing tests for `asDecodedServerSentEvents()` as well as
`asDecodedServerSentEventsWithJSONData()`.

---------

Co-authored-by: Honza Dvorsky <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/support Adopter support requests. status/triage Collecting information required to triage the issue.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants