Skip to content

Support actual streaming for AzureAI #1054

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

timostark
Copy link

@timostark timostark commented Jul 16, 2024

Follow up of #1042

As already raised in a few PRs the streaming is not really working at the moment for azure.. This PR contains the following fixes:

  1. Refactor Azure to actually work identically to OpenAI. The code is copied from the OpenAIModel. Retry template is not yet implemented. This looks like a lot of changes in the PR, but in general it is just a copy/paste from OpenAI (with some adjustments so that is compliant to the Azure Library)
  2. Switch to Async Azure Client, so that streaming is actually working (by switching to the Azure Async Client). The documentation is super hard to understand (by Azure), but after testing it is clear that only the async library is actually async and streams at least line by line (not token by token) - see Fix improper streaming in Azure Client #796 ++ Fix improper streaming in Azure Client #796 --> Note: this change might be breaking for spring ai consumers, as this will cause function al
  3. rollback to azure beta 8, due to issues in the JSON decoder errors ( [BUG] Streaming does not work with Spring AI and Azure OpenAI Azure/azure-sdk-for-java#40629 )

Can you have a look if that is the correct direction for you?

@timostark
Copy link
Author

Also remark: This PR is nicely working with a pretty big professional project with >50 functions and streaming. I do not have access to the Images service though. So would be nice if you can also check if that is working for you.

@timostark
Copy link
Author

Out of date now. Follow up pr will come.

@timostark timostark closed this Jul 16, 2024
@tzolov
Copy link
Contributor

tzolov commented Jul 17, 2024

Reopening this as a reference util @timostark submits the new PR. Looking forward for it :)

@tzolov tzolov reopened this Jul 17, 2024
@tzolov
Copy link
Contributor

tzolov commented Jul 17, 2024

I do not have access to the Images service though. So would be nice if you can also check if that is working for you.

Not sure I follow?

@tzolov tzolov self-assigned this Jul 17, 2024
@timostark
Copy link
Author

timostark commented Jul 19, 2024

Reopening this as a reference util @timostark submits the new PR. Looking forward for it :)

@tzolov: So I had to debug that for quite a while now.. The point is that streaming is actually working (unfortunatly not really smooth, due to an very strange implementation in azure.

So the question is why it is not working in REST Service: As far as i see, the issue is that the reactive web-services are not sending out the stream results (with the example provided on the Spring AI page), as the Flux is blocked using the sync approach.

I am unfortunatly not an expert at all in the usage of Schedulers / Flux && Threads, but my assumption is, that the Event-Stream sending is blocked by the Azure OpenAI call on the same thread and can not send out the events until the event stream is done.
A solution is therefore the usage of Schedulers in my own code (see a functional working approach in the code below.).

The usage of Schedulers is not required when using the Azure OpenAI Async library - because there is no blocking thread in the thread of the rest call.
I would normally suggest, to go to the async API approach, which internally creates a own thread for the Azure OpenAI call (which avoids that blocking behavior) - but I am not sure if the general usage of the async library is a good idea, as it seems to be a little slower (also for the non-streaming scenario).

Alternatives would be

  • just update the documentation and use Schedulers in the consumer code.
  • switch to Azure Async API (possibly using a spring property to enable/disable)
  • use scheduler inside the streaming method of SpringAI?

What is your opinion here? I would tend for option (c) but I am not sure if that has side effects..

Code what is working using current SpringAI implementation:

@GetMapping(value = "/chat/{chatId}/start", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatMessageDto> chat(@PathVariable Long chatId) {
return chatBotService.getMessageProcessingStreamForChat(chatId) //calling springai stream
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel());
}

"Wrong" documentation: https://docs.spring.io/spring-ai/reference/api/chat/azure-openai-chat.html

grafik

@timostark
Copy link
Author

timostark commented Jul 20, 2024

@tzolov OK, would need a little help to create a PR.. using subscribeOn / publishOn (as mentioned above) i was not able to find a functional solution. When using this approach (either in SpringAI or in my own code) the WebFlux implementation will be stuck after ~~100-300 tokens. The flatMapIterable is called, but the map((choice)) is not called anymore (in SpringAI). That only happens in case the result is forewarded to a webflux. Might be threading or backpressure.. I am not deep enough into the details of (Web)Flux/Threading/Schedulers to understand the root cause.

For me the WebFlux stream is working nicely if i isolate the handling to an own thread (probably the same what the OpenAiAsync lib by azure is doing) - See example: ( sorry, that will probably work much nicer with some integrated stuff of Flux - as said I am really not an expert here).

        Sinks.Many<ChatResponse> sink = Sinks.many().multicast().onBackpressureBuffer();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Runnable task = () -> {
            var response = azureOpenAiChatModel.stream(prompt);
            response.doOnEach(x -> sink.tryEmitNext(x.get())).doOnComplete(() -> sink.tryEmitComplete()).doOnError((error) -> {
                sink.tryEmitError(error);
            }).collectList().block();
        };
        return sink;

This is of course very ugly to have outside of the lib code.. So as said above: Do you have any preferred solution?

@markpollack
Copy link
Member

Just a quick comment, we should avoid an implementation that uses block to give the appearance of a sync call stack. Using both client classes, one for sync, one for async/reactive would be best. I am not sure if this addresses the issues you are observing as described above. Is the issue that sync calls aren't working when you use the async client but async calls are? If that is the case, using the sync client for sync calls would work well.

@tzolov
Copy link
Contributor

tzolov commented Jul 25, 2024

@timostark if the Async Client is working as expected for streaming response can't we use both clients, the AzureClient for call and Async Azure Client for stream?

@timostark
Copy link
Author

@tzolov yup certainly.. i will adjust the code accordingly in the next days and raise a PR.

@hy4470
Copy link

hy4470 commented Aug 30, 2024

@tzolov yup certainly.. i will adjust the code accordingly in the next days and raise a PR.

It looks like the bug fix was done a month ago, when can we expect it to make it to the PR?

@hy4470
Copy link

hy4470 commented Sep 10, 2024

@tzolov @timostark
Is no one following up on this issue?

@timostark
Copy link
Author

timostark commented Sep 10, 2024

@hy4470 @tzolov sorry i forgot about that issue. i am currently blocked with my work projects, but just fyi there is a functional workaround to get streaming working with the current status of SpringAI (Azure) by simply running the azureOpenAiChatmodel.stream inside an isolated runnable with a seperated EventSink.. That is reaaally no nice code, but imo good enough as a functional workaround.

Regarding the suggested change: I am really not sure if that is the general way to go for a central library like SpringAI. I weren'table to find a meanigful Azure Async Client documentation (what is the difference between Async/non-async). So i am worried that there are any side effects when switching everything to AzureAsynClient i am not aware of (e.g. bad performance, missed tokens, simply deprecated stuff, ...).
Therfore for the moment I tend to just work with my workaround.

Extremly simplified functional code of the workaround:

    private Flux<ChatResponse> sampleCode() {
        Sinks.Many<ChatResponse> eventSink = Sinks.many().multicast().onBackpressureBuffer();
        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {
            Runnable task = () -> {
                var response = azureOpenAiChatModel.stream(new Prompt());
                var allResponses = response.onBackpressureBuffer().doOnError((error) -> {
                    eventSink.tryEmitError(error);
                }).doOnEach(x -> {
                    eventSink.tryEmitNext(x.get());
                }).collectList().block();

                eventSink.tryEmitComplete();
            };
            executor.execute(task);
            executor.shutdown();
        } catch (Exception ex) {
        }
        return eventSink.asFlux();
    }

You can return this value in your TEXT_EVENT_STREAM_VALUE GetMapping..

@GetMapping(value = "/chat/{chatId}/start", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

@csterwa csterwa added this to the 1.0.0-M3 milestone Sep 25, 2024
@bruno-oliveira
Copy link
Contributor

Hello all!
Do you foresee this will make it to Milestone 3, and, if yes, do you also think that the expected deadline of releasing on October 4th will be met?

Great work on the library all around, by the way! We are happily using it in production and it works like a charm! Asking about the deadline, because we have some use cases that would benefit from streaming being integrated in the Azure bean, just like it works for the Bedrock/Anthropic ones!

Once again, really great work so far!

@markpollack
Copy link
Member

This is working now, see this PR for the details. #1447

Thanks for everyone's patience!

@markpollack markpollack closed this Oct 6, 2024
@hy4470
Copy link

hy4470 commented Oct 6, 2024

This is working now, see this PR for the details. #1447

Thanks for everyone's patience!

I conducted a test with the modified version and found that many tokens are being lost. Did anyone else not experience this issue?

@bruno-oliveira
Copy link
Contributor

@hy4470 I’ll give this a try when milestone 3 is officially released.
For now, at my team, we’re just going with the low level client for Azure, the “direct one” from the Azure SDK that works well enough…

@bruno-oliveira
Copy link
Contributor

@tzolov @markpollack Is milestone 3 officially available? I’ll try this out if it is 🥳

@sobychacko
Copy link
Contributor

Yes, M3 is available now. https://spring.io/blog/2024/10/08/spring-ai-1-0-0-m3-released

@bruno-oliveira
Copy link
Contributor

Yes, very awesome!! I’ve just seen this now, was knee deep in implementing stuff and this fell under my radar, will try it out today! Thank you! 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants