Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming API to help avoid timeouts #3

Open
simonw opened this issue Jan 11, 2023 · 6 comments
Open

Support streaming API to help avoid timeouts #3

simonw opened this issue Jan 11, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@simonw
Copy link
Owner

simonw commented Jan 11, 2023

I'm seeing a lot of timeouts where the openai_davinci() SQL function takes too long to load.

The streaming API - add "stream": true to the API JSON call - could be a neat way to avoid this... but it's obviously not compatible with running inside a SQL execution.

So how about a separate mechanism where you can define a SQL query that returns a prompt and Datasette then gives you a separate UI which executes that prompt in a streaming manner and streams it to your browser?

This might also provide a neat way to add features like rate-limiting, and to hide the API key while still allowing users to use it.

@simonw simonw added the enhancement New feature or request label Jan 11, 2023
@simonw
Copy link
Owner Author

simonw commented Jan 11, 2023

One curious way to implement this would be as an output renderer: https://docs.datasette.io/en/stable/plugin_hooks.html#register-output-renderer-datasette

It could fire on queries that return a single row with columns model, prompt, max_tokens and temperature (and maybe some more).

@simonw
Copy link
Owner Author

simonw commented Jan 11, 2023

It would need to return a streaming response, which I think can be done by overriding this asgi_send method: https://github.com/simonw/datasette/blob/8e7073404379d79a2d269167a12bbb58439edd39/datasette/utils/asgi.py#L344

@simonw
Copy link
Owner Author

simonw commented Jan 11, 2023

Built a working prototype:

diff --git a/datasette_openai/__init__.py b/datasette_openai/__init__.py
index b8cca70..1a3091a 100644
--- a/datasette_openai/__init__.py
+++ b/datasette_openai/__init__.py
@@ -1,5 +1,6 @@
-from datasette import hookimpl
+from datasette import hookimpl, Response
 import httpx
+import json
 import struct
 
 
@@ -56,3 +57,69 @@ def decode(blob):
 
 def encode(values):
     return struct.pack("f" * 1536, *values)
+
+
+def can_render(columns):
+    return {"prompt", "model", "max_tokens"}.issubset(columns)
+
+
+async def render(rows):
+    row = dict(rows[0])
+    prompt = row["prompt"]
+    model = row["model"]
+    max_tokens = row["max_tokens"]
+    api_key = "sk-..."
+
+    class GptResponse(Response):
+        async def asgi_send(self, send):
+            headers = {}
+            headers["content-type"] = "text/plain"
+            raw_headers = [
+                [key.encode("utf-8"), value.encode("utf-8")]
+                for key, value in headers.items()
+            ]
+            await send(
+                {
+                    "type": "http.response.start",
+                    "status": 200,
+                    "headers": raw_headers,
+                }
+            )
+            client = httpx.AsyncClient()
+            async with client.stream(
+                "POST",
+                "https://api.openai.com/v1/completions",
+                headers={"Authorization": f"Bearer {api_key}"},
+                json={
+                    "model": model,
+                    "prompt": prompt,
+                    "max_tokens": max_tokens,
+                    # "temperature": temperature,
+                    "stream": True,
+                },
+                timeout=15.0,
+            ) as response:
+                async for link in response.aiter_lines():
+                    if link.startswith("data: {"):
+                        decoded = json.loads(link.split("data: ", 1)[1])
+                        bit = decoded["choices"][0]["text"]
+                        await send(
+                            {
+                                "type": "http.response.body",
+                                "body": bit.encode("utf-8"),
+                                "more_body": True,
+                            }
+                        )
+
+            await send({"type": "http.response.body", "body": b""})
+
+    return GptResponse()
+
+
+@hookimpl
+def register_output_renderer(datasette):
+    return {
+        "extension": "openai",
+        "render": render,
+        "can_render": can_render,
+    }

This streamed a line at a time:

% curl 'http://127.0.0.1:8001/simonwillisonblog.openai?sql=select+%27write+a+poem+about+an+otter+and+owl+who+are+friends%27+as+prompt%2C+%27text-davinci-003%27+as+model%2C+512+as+max_tokens'

Two soulmates united,
An Otter and an Owl
A friendship so devoted,
It's a must for them to cuddle. 

A dutiful duo that loves life,
Adventuring along with no strife. 
Lighthearted, silly and comical,
Striding into the unknown—ecstatic and full.

The Otter and Owl cross paths so divine,
As if there were only one line,
Connecting their hearts through the air,
Creating a bond they'll be sure to share.

The Owl soaring high, 
His intelligence shines a light.
The Otter plunging deep,
Her love so fierce, so bright.

They'll stay together, close and tight
Keeping each other warm day and night.
An Otter and an Owl, friends 'till the end
What a special bond, for them to defend.

@simonw
Copy link
Owner Author

simonw commented Jan 11, 2023

Good proof of concept. Some decisions I need to make:

  • Where does the API key come from?
  • Are there protections in place? Could say this only works for canned queries, or for users with a specific permission, or a mix of the two.
  • Can I implement some kind of rate limit? How would that work?
  • What will the UI look like? I'm tempted to have a button on the query page that says "Run this prompt" and which then shows the results inline.
  • My prototype implementation streams the results back without using server-sent-events format, but is that the right thing to do? I like that you can easily curl it but I want to be able to implement solid JavaScript against it.

@simonw
Copy link
Owner Author

simonw commented Jan 11, 2023

I got ChatGPT to generate this:

const eventSource = new EventSource("API_ENDPOINT_URL");
const textarea = document.getElementById("textarea-id");

eventSource.onmessage = (event) => {
  textarea.value += event.data + '\n';
};

It said that this won't handle errors, so I got it to produce this instead:

let eventSource = new EventSource("API_ENDPOINT_URL");
const textarea = document.getElementById("textarea-id");
let connectionAttempts = 0;
const maxConnectionAttempts = 5;

eventSource.onmessage = (event) => {
  textarea.value += event.data + '\n';
};

eventSource.onerror = (event) => {
  if (event.target.readyState === EventSource.CLOSED) {
    if (connectionAttempts < maxConnectionAttempts) {
      console.log(`Connection lost. Attempting to reconnect (attempt ${connectionAttempts})...`);
      connectionAttempts++;
      setTimeout(() => {
        eventSource.close();
        eventSource = new EventSource("API_ENDPOINT_URL");
      }, 5000);
    } else {
      console.log(`Maximum connection attempts reached. Giving up.`);
    }
  }
};

My prompts to get that final result:

  • Write JavaScript that consumes a server sent events API endpoint and logs out the results to a textarea
  • Write the code for reconnecting
  • won't that break because eventSource is const?

@simonw
Copy link
Owner Author

simonw commented Jan 12, 2023

That prototype in easier to copy-and-paste format:

def can_render(columns):
    return {"prompt", "model", "max_tokens"}.issubset(columns)


async def render(rows):
    row = dict(rows[0])
    prompt = row["prompt"]
    model = row["model"]
    max_tokens = row["max_tokens"]
    api_key = "sk-..."

    class GptResponse(Response):
        async def asgi_send(self, send):
            headers = {}
            headers["content-type"] = "text/plain"
            raw_headers = [
                [key.encode("utf-8"), value.encode("utf-8")]
                for key, value in headers.items()
            ]
            await send(
                {
                    "type": "http.response.start",
                    "status": 200,
                    "headers": raw_headers,
                }
            )
            client = httpx.AsyncClient()
            async with client.stream(
                "POST",
                "https://api.openai.com/v1/completions",
                headers={"Authorization": f"Bearer {api_key}"},
                json={
                    "model": model,
                    "prompt": prompt,
                    "max_tokens": max_tokens,
                    # "temperature": temperature,
                    "stream": True,
                },
                timeout=15.0,
            ) as response:
                async for link in response.aiter_lines():
                    if link.startswith("data: {"):
                        decoded = json.loads(link.split("data: ", 1)[1])
                        bit = decoded["choices"][0]["text"]
                        await send(
                            {
                                "type": "http.response.body",
                                "body": bit.encode("utf-8"),
                                "more_body": True,
                            }
                        )

            await send({"type": "http.response.body", "body": b""})

    return GptResponse()


@hookimpl
def register_output_renderer(datasette):
    return {
        "extension": "openai",
        "render": render,
        "can_render": can_render,
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant