-
-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support streaming API to help avoid timeouts #3
Comments
One curious way to implement this would be as an output renderer: https://docs.datasette.io/en/stable/plugin_hooks.html#register-output-renderer-datasette It could fire on queries that return a single row with columns |
It would need to return a streaming response, which I think can be done by overriding this |
Built a working prototype: diff --git a/datasette_openai/__init__.py b/datasette_openai/__init__.py
index b8cca70..1a3091a 100644
--- a/datasette_openai/__init__.py
+++ b/datasette_openai/__init__.py
@@ -1,5 +1,6 @@
-from datasette import hookimpl
+from datasette import hookimpl, Response
import httpx
+import json
import struct
@@ -56,3 +57,69 @@ def decode(blob):
def encode(values):
return struct.pack("f" * 1536, *values)
+
+
+def can_render(columns):
+ return {"prompt", "model", "max_tokens"}.issubset(columns)
+
+
+async def render(rows):
+ row = dict(rows[0])
+ prompt = row["prompt"]
+ model = row["model"]
+ max_tokens = row["max_tokens"]
+ api_key = "sk-..."
+
+ class GptResponse(Response):
+ async def asgi_send(self, send):
+ headers = {}
+ headers["content-type"] = "text/plain"
+ raw_headers = [
+ [key.encode("utf-8"), value.encode("utf-8")]
+ for key, value in headers.items()
+ ]
+ await send(
+ {
+ "type": "http.response.start",
+ "status": 200,
+ "headers": raw_headers,
+ }
+ )
+ client = httpx.AsyncClient()
+ async with client.stream(
+ "POST",
+ "https://api.openai.com/v1/completions",
+ headers={"Authorization": f"Bearer {api_key}"},
+ json={
+ "model": model,
+ "prompt": prompt,
+ "max_tokens": max_tokens,
+ # "temperature": temperature,
+ "stream": True,
+ },
+ timeout=15.0,
+ ) as response:
+ async for link in response.aiter_lines():
+ if link.startswith("data: {"):
+ decoded = json.loads(link.split("data: ", 1)[1])
+ bit = decoded["choices"][0]["text"]
+ await send(
+ {
+ "type": "http.response.body",
+ "body": bit.encode("utf-8"),
+ "more_body": True,
+ }
+ )
+
+ await send({"type": "http.response.body", "body": b""})
+
+ return GptResponse()
+
+
+@hookimpl
+def register_output_renderer(datasette):
+ return {
+ "extension": "openai",
+ "render": render,
+ "can_render": can_render,
+ } This streamed a line at a time:
|
Good proof of concept. Some decisions I need to make:
|
I got ChatGPT to generate this: const eventSource = new EventSource("API_ENDPOINT_URL");
const textarea = document.getElementById("textarea-id");
eventSource.onmessage = (event) => {
textarea.value += event.data + '\n';
}; It said that this won't handle errors, so I got it to produce this instead: let eventSource = new EventSource("API_ENDPOINT_URL");
const textarea = document.getElementById("textarea-id");
let connectionAttempts = 0;
const maxConnectionAttempts = 5;
eventSource.onmessage = (event) => {
textarea.value += event.data + '\n';
};
eventSource.onerror = (event) => {
if (event.target.readyState === EventSource.CLOSED) {
if (connectionAttempts < maxConnectionAttempts) {
console.log(`Connection lost. Attempting to reconnect (attempt ${connectionAttempts})...`);
connectionAttempts++;
setTimeout(() => {
eventSource.close();
eventSource = new EventSource("API_ENDPOINT_URL");
}, 5000);
} else {
console.log(`Maximum connection attempts reached. Giving up.`);
}
}
}; My prompts to get that final result:
|
That prototype in easier to copy-and-paste format: def can_render(columns):
return {"prompt", "model", "max_tokens"}.issubset(columns)
async def render(rows):
row = dict(rows[0])
prompt = row["prompt"]
model = row["model"]
max_tokens = row["max_tokens"]
api_key = "sk-..."
class GptResponse(Response):
async def asgi_send(self, send):
headers = {}
headers["content-type"] = "text/plain"
raw_headers = [
[key.encode("utf-8"), value.encode("utf-8")]
for key, value in headers.items()
]
await send(
{
"type": "http.response.start",
"status": 200,
"headers": raw_headers,
}
)
client = httpx.AsyncClient()
async with client.stream(
"POST",
"https://api.openai.com/v1/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={
"model": model,
"prompt": prompt,
"max_tokens": max_tokens,
# "temperature": temperature,
"stream": True,
},
timeout=15.0,
) as response:
async for link in response.aiter_lines():
if link.startswith("data: {"):
decoded = json.loads(link.split("data: ", 1)[1])
bit = decoded["choices"][0]["text"]
await send(
{
"type": "http.response.body",
"body": bit.encode("utf-8"),
"more_body": True,
}
)
await send({"type": "http.response.body", "body": b""})
return GptResponse()
@hookimpl
def register_output_renderer(datasette):
return {
"extension": "openai",
"render": render,
"can_render": can_render,
} |
I'm seeing a lot of timeouts where the
openai_davinci()
SQL function takes too long to load.The streaming API - add
"stream": true
to the API JSON call - could be a neat way to avoid this... but it's obviously not compatible with running inside a SQL execution.So how about a separate mechanism where you can define a SQL query that returns a prompt and Datasette then gives you a separate UI which executes that prompt in a streaming manner and streams it to your browser?
This might also provide a neat way to add features like rate-limiting, and to hide the API key while still allowing users to use it.
The text was updated successfully, but these errors were encountered: