Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket Support for Streaming Input and Output #320

Open
ChenghaoMou opened this issue Oct 3, 2024 · 12 comments
Open

Websocket Support for Streaming Input and Output #320

ChenghaoMou opened this issue Oct 3, 2024 · 12 comments
Labels
enhancement New feature or request question Further information is requested

Comments

@ChenghaoMou
Copy link


🚀 Feature

Support websocket endpoints to allow two-way real-time data communication.

Motivation

Currently, the requests are processed with the expectation that the data is complete and stateless. However, the input data isn't always ready immediately for use cases like speech to text, text to speech, audio/speech understanding, especially in time-sensitive situations. With the recent release of Realtime API from OpenAI and a new family of voice AI models (ultravox, mini-omni, llama-omni, moshi), support for streaming input and output could benefit the community in many ways and unlock even more creative uses of AI models.

Pitch

Support streaming input and output with websocket or any other methods to allow real-time AI applications.

Alternatives

A typical FastAPI websocket implementation is very template-like:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            message = await websocket.receive_bytes()
            # process data
            # results = model(parse(message))
            await websocket.send_json(results)
    except WebSocketDisconnect:
        logger.error("WebSocket disconnected")
    except Exception as e:
        logger.error(f"Error: {e}")
        if websocket.client_state != WebSocketState.DISCONNECTED:
            await websocket.close(code=1001)
    finally:
        # clean up

However, this might make the batching impossible or complicated.

I am new to this repo, so if there is a workaround by hacking the server/spec/api to allow websocket, I am more than happy to contribute. If this is duplicate/irrelevant, sorry for the trouble.

Thanks a million for open sourcing this awesome project. ❤️

Additional context

@ChenghaoMou ChenghaoMou added the enhancement New feature or request label Oct 3, 2024
@aniketmaurya
Copy link
Collaborator

hi @ChenghaoMou, thank you for looking into and trying LitServe 💜 !! We support streaming which can be enabled by adding a stream=True argument to LitServer class. We have the streaming documentation here.

Please let me know if it helps.

@ChenghaoMou
Copy link
Author

Thanks for the prompt response! @aniketmaurya

If I am reading the documentation right, the current streaming is only for the output, not the input. It feels more like server side events (one input and multiple outputs) rather than websocket (streaming both input and output). The difference could be, for example, in speech to text:

  1. Existing streaming: upload an entire audio file (input non-streaming) to get transcription "word by word" (output streaming);
  2. This request: streaming audio from a live speech (input streaming) to get transcription "word by word" (output streaming);

I hope this makes sense.

@aniketmaurya
Copy link
Collaborator

yes @ChenghaoMou, the current streaming is server-sent event. Let's keep this open and we can evaluate this feature based on requests from the community.

@aniketmaurya aniketmaurya added the question Further information is requested label Oct 4, 2024
@cyberluke
Copy link

cyberluke commented Oct 14, 2024

Come on, websocket is basic feature. That was the first thing I required after signing to PRO account. Currently they open ports only if there is GET endpoint that returns 200. Using FastAPI you just need to enable both GET request and WSS endpoint.

Me as community member replies that this is a crucial and basic element in software development for web & mobile apps.

Studio AI have some Whisper examples, but they do it wrong. They just encapsulate Whisper with StreamLit and it looks like REST API or it just processes whole audio file. It made me a little bit sad as perfectionist and software developer with 20 years of experience :-D

@dreamerwhite
Copy link

i tired something to make request like streaming in and streaming out.

  class Llama3API(ls.LitAPI):

      def setup(self, device):
          self.llm = litgpt.LLM.load("checkpoints/meta-llama/Meta-Llama-3-8B-Instruct")
  
      def decode_request(self, request):
          for i in range(10):
            yield self.model(i)
  
      def predict(self, prompts):
          for i in range(prompts):
             yield from self.llm.generate(i, max_new_tokens=200, stream=True)
      def encode_response(self, output):
  
          for out in output:
              yield {"output": out}

but i haven't tested the performance yet。

@mikewaters
Copy link

mikewaters commented Jan 5, 2025

@cyberluke @ChenghaoMou
The FastAPI framework that Lit is built on (see LitServer) has native support for websockets, even if LitServe isn't using it. Lit doesn't appear to hide the FastAPI knobs and buttons, though it does make some implementation decisions that may push you in some different direction (by directly managing asgi worker lifecycle, albeit for cool and interesting reasons).

Here's some untested and fake illustrative code:

import LitServe as ls
from fastapi import FastAPI, WebSocket

""" A typical FastAPI project defines an "app" instance, which at runtime is served by some 
asgi framework like `Uvicorn`. This usually occurs out-of-process from the app, more in 
web-server-land if you know what I mean. 
"""
#app = FastAPI()

""" Lit instead uses a `FastAPI` inside a `LitServer` instance, alongside API specs etc, and this server 
manages the app lifecycle directly by using the Uvicorn python interface.
"""
server = ls.LitServer(SomeLitAPI()) # this instantiates a `FastAPI` internally
server.run() # this directly manages ports/workers/etc with Uvicorn

"""
Below is some bog-standard FastAPI WebSocket code, which attaches a route to an `app`. 
The only change needed was to reference the FastAPI `app` that's dangling off the LitServer.
"""

@server.app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """ Copied from some FastAPI project on github """
    await websocket.accept()
    while True:
        client_message = await websocket.receive_text()
        async for ai_response in get_ai_response(client_message):
            await websocket.send_text(ai_response)

async def get_ai_response(message: str):
    """Generate responses from the AI asynchronously."""
    response = await some_llm_provider.chat(..., stream=True)
    async for chunk in response:
        yield chunk.something

Potential impediments:

  1. LitServe appears to directly manage its server process (Uvicorn), including port selection. You will obviously need another listen port for the ws protocol, and so will need to modify the Config (or use your own listener).
  2. If LitServe doesn't expose the WebSocket fn, it stands to reason that Lightning/"Studio AI" might not provision a wss listener and cert.

Team: Corrections are appreciated, as I am evaluating Lit/Lightning and have similar requirements as OP.

EDIT: Found a FastAPI WebSocket whisper streaming implementation, which one of the commenters mentioned. Maybe you can just glue this together with a LitServe.app: https://github.com/QuentinFuxa/whisper_streaming_web/blob/main/whisper_fastapi_online_server.py

@cyberluke
Copy link

cyberluke commented Jan 5, 2025

@mikewaters I came here only to complain, which is Czech / European national sport, to cheer everyone up to make Lightning-AI API better for the users :-D

By that time I wrote original paragraph, I had websocket whisper running on Lightning AI already :-) ...using custom Python with FastAPI. Basically I just got rid of all Lit and forgot Lightning AI documentation and do everything low-level as I'm used to.

But I had to bypass the exposing port feature in Lightning AI Studio because it requires that root url / of server always returns HTTP 200 for GET request. So you cannot have only WSS service. But if you tell FastAPI to expose both WSS and HTTP and just return HTTP 200 status for GET, then it will open the port and WSS will work.

If you want to be more badass then look at Rust WASM implementation of Whisper, which runs on Safari iPhone natively using WebGPU and does not waste server resources. All these "whisper_streaming_web" are buggy. I spent 2 weeks Googling and you would need to fork 30 repositories, hack your own way with cherry pick commits, with individual fixes, because community don't work properly, and then you maybe have semi-pro working solution with 70% recognition success rate (Whisper is not that precise and the streaming is hacked because OpenAI hard coded 30s buffer, so all streaming apps must fill the buffer up to 30s with some data in order to let the AI model process that).

But I made for my father, who is enterprise Azure DevOps, smart AI Powershell assistant and you can use your voice to tell OpenAI what you want and it will generate proper Azure Active Directory and Powershell commands. For such voice transmission, with end to end encryption, it could be beneficial to have websocket streaming support. (just example of use case how people might want to use it)

@cyberluke
Copy link

(there was a short thought that I would like to contribute to LitServe, but I would have to rewrite 20% of it and provide proper OpenAI API implementation so I thought it could make someone upset - but I'm still up to that idea - if someone like adventure 👯 )

@lorinczszabolcs
Copy link

I would have the same request, though for a slightly different reason: in my use-case the request is coming in multiple chunks (files), so the server would need to wait until all files are received and only then start processing them / do inference.

@bhimrazy
Copy link
Contributor

(there was a short thought that I would like to contribute to LitServe, but I would have to rewrite 20% of it and provide proper OpenAI API implementation so I thought it could make someone upset - but I'm still up to that idea - if someone like adventure 👯 )

Hi @cyberluke,
I have an idea: How about trying a custom spec similar to OpenAISpec but with WebSocket support?
If it works, you could share it in this issue so that other members can benefit from it as well.
It could even function as a plug-and-play solution.

@aniketmaurya
Copy link
Collaborator

thank you everyone for participating in this discussion! This is in our plan to add as a feature now. I like @bhimrazy's proposal of making a spec for this and it can exist as a plug-and-play solution. I think we also need to make it easier for developers to quickly implement a spec for their use case.

@ChenghaoMou
Copy link
Author

Based on the some of the suggestions above, I tested it with the following mvp code that works, at least locally, with a ws server and client:

server.py
import base64
from typing import Any

import numpy as np
from fastapi import WebSocket

import litserve as ls


class SimpleLitAPI(ls.LitAPI):
    def setup(self, device):
        self.active_connections = set()

    async def websocket_endpoint(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.add(websocket)
        try:
            while True:
                # Receive message from client
                data = await websocket.receive_json()

                # Process the input using our model
                input_value = self.decode_request(data)
                result = self.predict(input_value)

                # Send response back to client
                await websocket.send_json(self.encode_response(result))
                break
        finally:
            self.active_connections.remove(websocket)
            await websocket.close()

    def decode_request(self, request: dict[str, Any]) -> np.ndarray:
        # Convert the request payload to model input.
        input_value = base64.b64decode(request["audio"])
        return np.frombuffer(input_value, dtype=np.float32)

    def predict(self, x) -> dict[str, int]:
        return {"output": len(x)}

    def encode_response(self, output: dict[str, int]) -> dict[str, int]:
        return output


if __name__ == "__main__":
    api = SimpleLitAPI()
    # I don't know why this is needed
    api.setup("cpu")
    server = ls.LitServer(api, accelerator="auto", max_batch_size=1)
    server.app.add_api_websocket_route("/ws", api.websocket_endpoint)
    server.run(port=8000)
client.py
import argparse
import asyncio
import base64
import json
import os

import librosa
import numpy as np
import websockets


async def test_websocket(audio_path):
    uri = "ws://localhost:8000/ws"
    audio_array, _ = librosa.load(audio_path, sr=16000)
    audio_array = audio_array.astype(np.float32)[:1000]
    audio_array = audio_array.tobytes()
    audio_array = base64.b64encode(audio_array).decode("utf-8")

    async with websockets.connect(uri) as websocket:
        # Send test data
        test_data = {"audio": audio_array}
        await websocket.send(json.dumps(test_data))
        # Receive response
        response = await websocket.recv()
        print(f"Received response: {response}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Transcribe audio files")
    parser.add_argument("--file", help="Filename of the audio file to transcribe", type=str, default="mlk.mp3")
    args = parser.parse_args()

    # call the model with the file
    audio_path = os.path.join(os.getcwd(), "audio_samples", args.file)
    asyncio.run(test_websocket(audio_path))

Ofc, no model is actually involved in this quick demo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

7 participants