From 90f4880b512c494bd7175197fc6d58b7a54566f0 Mon Sep 17 00:00:00 2001 From: josc146 Date: Mon, 30 Oct 2023 23:40:33 +0800 Subject: [PATCH] improve fetchSSE (#532) --- package-lock.json | 9 -- package.json | 1 - src/utils/eventsource-parser.mjs | 130 ++++++++++++++++++++++++++++ src/utils/fetch-sse.mjs | 43 +++++---- src/utils/index.mjs | 2 +- src/utils/stream-async-iterable.mjs | 14 --- 6 files changed, 151 insertions(+), 48 deletions(-) create mode 100644 src/utils/eventsource-parser.mjs delete mode 100644 src/utils/stream-async-iterable.mjs diff --git a/package-lock.json b/package-lock.json index 6f9b774e..c29029ea 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,6 @@ "claude-ai": "^1.2.2", "countries-list": "^2.6.1", "diff": "^5.1.0", - "eventsource-parser": "^1.0.0", "file-saver": "^2.0.5", "github-markdown-css": "^5.2.0", "gpt-3-encoder": "^1.1.4", @@ -4261,14 +4260,6 @@ "node": ">=0.8.x" } }, - "node_modules/eventsource-parser": { - "version": "1.0.0", - "resolved": "https://registry.npmmirror.com/eventsource-parser/-/eventsource-parser-1.0.0.tgz", - "integrity": "sha512-9jgfSCa3dmEme2ES3mPByGXfgZ87VbP97tng1G2nWwWx6bV2nYxm2AWCrbQjXToSe+yYlqaZNtxffR9IeQr95g==", - "engines": { - "node": ">=14.18" - } - }, "node_modules/extend": { "version": "3.0.2", "resolved": "https://registry.npmmirror.com/extend/-/extend-3.0.2.tgz", diff --git a/package.json b/package.json index c202522e..d12b3823 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,6 @@ "claude-ai": "^1.2.2", "countries-list": "^2.6.1", "diff": "^5.1.0", - "eventsource-parser": "^1.0.0", "file-saver": "^2.0.5", "github-markdown-css": "^5.2.0", "gpt-3-encoder": "^1.1.4", diff --git a/src/utils/eventsource-parser.mjs b/src/utils/eventsource-parser.mjs new file mode 100644 index 00000000..6a18a65e --- /dev/null +++ b/src/utils/eventsource-parser.mjs @@ -0,0 +1,130 @@ +// https://www.npmjs.com/package/eventsource-parser/v/1.1.1 + +function createParser(onParse) { + let isFirstChunk + let bytes + let buffer + let startingPosition + let startingFieldLength + let eventId + let eventName + let data + reset() + return { + feed, + reset, + } + function reset() { + isFirstChunk = true + bytes = [] + buffer = '' + startingPosition = 0 + startingFieldLength = -1 + eventId = void 0 + eventName = void 0 + data = '' + } + + function feed(chunk) { + bytes = bytes.concat(Array.from(chunk)) + buffer = new TextDecoder().decode(new Uint8Array(bytes)) + if (isFirstChunk && hasBom(buffer)) { + buffer = buffer.slice(BOM.length) + } + isFirstChunk = false + const length = buffer.length + let position = 0 + let discardTrailingNewline = false + while (position < length) { + if (discardTrailingNewline) { + if (buffer[position] === '\n') { + ++position + } + discardTrailingNewline = false + } + let lineLength = -1 + let fieldLength = startingFieldLength + let character + for (let index = startingPosition; lineLength < 0 && index < length; ++index) { + character = buffer[index] + if (character === ':' && fieldLength < 0) { + fieldLength = index - position + } else if (character === '\r') { + discardTrailingNewline = true + lineLength = index - position + } else if (character === '\n') { + lineLength = index - position + } + } + if (lineLength < 0) { + startingPosition = length - position + startingFieldLength = fieldLength + break + } else { + startingPosition = 0 + startingFieldLength = -1 + } + parseEventStreamLine(buffer, position, fieldLength, lineLength) + position += lineLength + 1 + } + if (position === length) { + bytes = [] + buffer = '' + } else if (position > 0) { + bytes = bytes.slice(new TextEncoder().encode(buffer.slice(0, position)).length) + buffer = buffer.slice(position) + } + } + + function parseEventStreamLine(lineBuffer, index, fieldLength, lineLength) { + if (lineLength === 0) { + if (data.length > 0) { + onParse({ + type: 'event', + id: eventId, + event: eventName || void 0, + data: data.slice(0, -1), + // remove trailing newline + }) + + data = '' + eventId = void 0 + } + eventName = void 0 + return + } + const noValue = fieldLength < 0 + const field = lineBuffer.slice(index, index + (noValue ? lineLength : fieldLength)) + let step = 0 + if (noValue) { + step = lineLength + } else if (lineBuffer[index + fieldLength + 1] === ' ') { + step = fieldLength + 2 + } else { + step = fieldLength + 1 + } + const position = index + step + const valueLength = lineLength - step + const value = lineBuffer.slice(position, position + valueLength).toString() + if (field === 'data') { + data += value ? ''.concat(value, '\n') : '\n' + } else if (field === 'event') { + eventName = value + } else if (field === 'id' && !value.includes('\0')) { + eventId = value + } else if (field === 'retry') { + const retry = parseInt(value, 10) + if (!Number.isNaN(retry)) { + onParse({ + type: 'reconnect-interval', + value: retry, + }) + } + } + } +} +const BOM = [239, 187, 191] +function hasBom(buffer) { + return BOM.every((charCode, index) => buffer.charCodeAt(index) === charCode) +} +export { createParser } diff --git a/src/utils/fetch-sse.mjs b/src/utils/fetch-sse.mjs index 939f8bcf..059cc043 100644 --- a/src/utils/fetch-sse.mjs +++ b/src/utils/fetch-sse.mjs @@ -1,5 +1,4 @@ -import { createParser } from 'eventsource-parser' -import { streamAsyncIterable } from './stream-async-iterable' +import { createParser } from './eventsource-parser.mjs' export async function fetchSSE(resource, options) { const { onMessage, onStart, onEnd, onError, ...fetchOptions } = options @@ -13,34 +12,32 @@ export async function fetchSSE(resource, options) { } const parser = createParser((event) => { if (event.type === 'event') { - if (event.data === '[DONE]') { - onMessage(event.data) - } else { - try { - JSON.parse(event.data) - onMessage(event.data) - } catch (error) { - console.error('json error', error) - onMessage( - event.data - .replace(/^"|"$/g, '') - .replaceAll('\\"', '"') - .replaceAll('\\\\u', '\\u') - .replaceAll('\\\\n', '\\n'), - ) - } - } + onMessage(event.data) } }) let hasStarted = false - for await (const chunk of streamAsyncIterable(resp.body)) { - const str = new TextDecoder().decode(chunk) - parser.feed(str) - + const reader = resp.body.getReader() + let result + while (!(result = await reader.read()).done) { + const chunk = result.value if (!hasStarted) { + const str = new TextDecoder().decode(chunk) hasStarted = true await onStart(str) + + let fakeSseData + try { + const commonResponse = JSON.parse(str) + fakeSseData = 'data: ' + JSON.stringify(commonResponse) + '\n\ndata: [DONE]\n\n' + } catch (error) { + console.debug('not common response', error) + } + if (fakeSseData) { + parser.feed(new TextEncoder().encode(fakeSseData)) + break + } } + parser.feed(chunk) } await onEnd() } diff --git a/src/utils/index.mjs b/src/utils/index.mjs index b61e7dc6..ead5bba7 100644 --- a/src/utils/index.mjs +++ b/src/utils/index.mjs @@ -16,5 +16,5 @@ export * from './open-url' export * from './parse-float-with-clamp' export * from './parse-int-with-clamp' export * from './set-element-position-in-viewport' -export * from './stream-async-iterable' +export * from './eventsource-parser.mjs' export * from './update-ref-height' diff --git a/src/utils/stream-async-iterable.mjs b/src/utils/stream-async-iterable.mjs deleted file mode 100644 index 8586e20f..00000000 --- a/src/utils/stream-async-iterable.mjs +++ /dev/null @@ -1,14 +0,0 @@ -export async function* streamAsyncIterable(stream) { - const reader = stream.getReader() - try { - while (true) { - const { done, value } = await reader.read() - if (done) { - return - } - yield value - } - } finally { - reader.releaseLock() - } -}