Skip to content

Commit

Permalink
improve fetchSSE (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
josStorer committed Oct 30, 2023
1 parent 0f3ec38 commit 90f4880
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 48 deletions.
9 changes: 0 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"claude-ai": "^1.2.2",
"countries-list": "^2.6.1",
"diff": "^5.1.0",
"eventsource-parser": "^1.0.0",
"file-saver": "^2.0.5",
"github-markdown-css": "^5.2.0",
"gpt-3-encoder": "^1.1.4",
Expand Down
130 changes: 130 additions & 0 deletions src/utils/eventsource-parser.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// https://www.npmjs.com/package/eventsource-parser/v/1.1.1

function createParser(onParse) {
let isFirstChunk
let bytes
let buffer
let startingPosition
let startingFieldLength
let eventId
let eventName
let data
reset()
return {
feed,
reset,
}
function reset() {
isFirstChunk = true
bytes = []
buffer = ''
startingPosition = 0
startingFieldLength = -1
eventId = void 0
eventName = void 0
data = ''
}

function feed(chunk) {
bytes = bytes.concat(Array.from(chunk))
buffer = new TextDecoder().decode(new Uint8Array(bytes))
if (isFirstChunk && hasBom(buffer)) {
buffer = buffer.slice(BOM.length)
}
isFirstChunk = false
const length = buffer.length
let position = 0
let discardTrailingNewline = false
while (position < length) {
if (discardTrailingNewline) {
if (buffer[position] === '\n') {
++position
}
discardTrailingNewline = false
}
let lineLength = -1
let fieldLength = startingFieldLength
let character
for (let index = startingPosition; lineLength < 0 && index < length; ++index) {
character = buffer[index]
if (character === ':' && fieldLength < 0) {
fieldLength = index - position
} else if (character === '\r') {
discardTrailingNewline = true
lineLength = index - position
} else if (character === '\n') {
lineLength = index - position
}
}
if (lineLength < 0) {
startingPosition = length - position
startingFieldLength = fieldLength
break
} else {
startingPosition = 0
startingFieldLength = -1
}
parseEventStreamLine(buffer, position, fieldLength, lineLength)
position += lineLength + 1
}
if (position === length) {
bytes = []
buffer = ''
} else if (position > 0) {
bytes = bytes.slice(new TextEncoder().encode(buffer.slice(0, position)).length)
buffer = buffer.slice(position)
}
}

function parseEventStreamLine(lineBuffer, index, fieldLength, lineLength) {
if (lineLength === 0) {
if (data.length > 0) {
onParse({
type: 'event',
id: eventId,
event: eventName || void 0,
data: data.slice(0, -1),
// remove trailing newline
})

data = ''
eventId = void 0
}
eventName = void 0
return
}
const noValue = fieldLength < 0
const field = lineBuffer.slice(index, index + (noValue ? lineLength : fieldLength))
let step = 0
if (noValue) {
step = lineLength
} else if (lineBuffer[index + fieldLength + 1] === ' ') {
step = fieldLength + 2
} else {
step = fieldLength + 1
}
const position = index + step
const valueLength = lineLength - step
const value = lineBuffer.slice(position, position + valueLength).toString()
if (field === 'data') {
data += value ? ''.concat(value, '\n') : '\n'
} else if (field === 'event') {
eventName = value
} else if (field === 'id' && !value.includes('\0')) {
eventId = value
} else if (field === 'retry') {
const retry = parseInt(value, 10)
if (!Number.isNaN(retry)) {
onParse({
type: 'reconnect-interval',
value: retry,
})
}
}
}
}
const BOM = [239, 187, 191]
function hasBom(buffer) {
return BOM.every((charCode, index) => buffer.charCodeAt(index) === charCode)
}
export { createParser }
43 changes: 20 additions & 23 deletions src/utils/fetch-sse.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { createParser } from 'eventsource-parser'
import { streamAsyncIterable } from './stream-async-iterable'
import { createParser } from './eventsource-parser.mjs'

export async function fetchSSE(resource, options) {
const { onMessage, onStart, onEnd, onError, ...fetchOptions } = options
Expand All @@ -13,34 +12,32 @@ export async function fetchSSE(resource, options) {
}
const parser = createParser((event) => {
if (event.type === 'event') {
if (event.data === '[DONE]') {
onMessage(event.data)
} else {
try {
JSON.parse(event.data)
onMessage(event.data)
} catch (error) {
console.error('json error', error)
onMessage(
event.data
.replace(/^"|"$/g, '')
.replaceAll('\\"', '"')
.replaceAll('\\\\u', '\\u')
.replaceAll('\\\\n', '\\n'),
)
}
}
onMessage(event.data)
}
})
let hasStarted = false
for await (const chunk of streamAsyncIterable(resp.body)) {
const str = new TextDecoder().decode(chunk)
parser.feed(str)

const reader = resp.body.getReader()
let result
while (!(result = await reader.read()).done) {
const chunk = result.value
if (!hasStarted) {
const str = new TextDecoder().decode(chunk)
hasStarted = true
await onStart(str)

let fakeSseData
try {
const commonResponse = JSON.parse(str)
fakeSseData = 'data: ' + JSON.stringify(commonResponse) + '\n\ndata: [DONE]\n\n'
} catch (error) {
console.debug('not common response', error)
}
if (fakeSseData) {
parser.feed(new TextEncoder().encode(fakeSseData))
break
}
}
parser.feed(chunk)
}
await onEnd()
}
2 changes: 1 addition & 1 deletion src/utils/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ export * from './open-url'
export * from './parse-float-with-clamp'
export * from './parse-int-with-clamp'
export * from './set-element-position-in-viewport'
export * from './stream-async-iterable'
export * from './eventsource-parser.mjs'
export * from './update-ref-height'
14 changes: 0 additions & 14 deletions src/utils/stream-async-iterable.mjs

This file was deleted.

0 comments on commit 90f4880

Please sign in to comment.