Skip to content

Commit

Permalink
feat(common): add parseSSEStream function
Browse files Browse the repository at this point in the history
This function parses the raw buffer recieved from sse requests and emits completed chunks at O(n)
  • Loading branch information
apaparazzi0329 committed Nov 22, 2024
1 parent 00f065a commit 1998816
Showing 1 changed file with 99 additions and 0 deletions.
99 changes: 99 additions & 0 deletions lib/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os = require('os');
import { EventEmitter } from 'events';

// tslint:disable-next-line:no-var-requires
const pkg = require('../package.json');
Expand Down Expand Up @@ -30,3 +31,101 @@ export function getSdkHeaders(serviceName: string, serviceVersion: string, opera

return headers;
}

export function parseSSEStream(stream: NodeJS.ReadableStream) {
let eventMemoryKey = ''
let dataMemoryKey = ''
let stringBuffer = ''
let eventMessage = ''
const eventEmitter = new EventEmitter()

stream.on('data', (chunk: Buffer) => {
const chunkString = chunk.toString()

for (const char of chunkString) {
// Looks for "event: "
if (char === 'e' && eventMemoryKey === '') {
eventMemoryKey += char
} else if (char === 'v' && eventMemoryKey === 'e') {
eventMemoryKey += char
} else if (char === 'e' && eventMemoryKey === 'ev') {
eventMemoryKey += char
} else if (char === 'n' && eventMemoryKey === 'eve') {
eventMemoryKey += char
} else if (char === 't' && eventMemoryKey === 'even') {
eventMemoryKey += char
} else if (char === ':' && eventMemoryKey === 'event') {
eventMemoryKey += char
} else if (char === ' ' && eventMemoryKey === 'event:') {
eventMemoryKey += char
} else if (eventMemoryKey === 'event: ' && dataMemoryKey !== 'data: ') {
stringBuffer += char // Start saving characters in string buffer
} else if (eventMemoryKey === 'event: ' && dataMemoryKey === 'data: ') {
// Must remove extra 'event: ' added to stringBuffer
const trimmedStringBuffer = stringBuffer.substring(0, stringBuffer.length - 'event: '.length)

try {
const parsedBuffer = JSON.parse(trimmedStringBuffer)
eventEmitter.emit('data', {
event: eventMessage,
data: parsedBuffer
})
} catch(e) {
if (!(e instanceof SyntaxError)) {
throw Error('Unexpected error when trying to emit data event')
}
// stringBuffer was not parseable, we must continue and wait for more chunks
}

stringBuffer = char
dataMemoryKey = ''
} else {
eventMemoryKey = ''
}
// Looks for "data: "
if (char === 'd' && eventMemoryKey === 'event: ') {
dataMemoryKey += char
} else if (char === 'a' && dataMemoryKey === 'd') {
dataMemoryKey += char
} else if (char === 't' && dataMemoryKey === 'da') {
dataMemoryKey += char
} else if (char === 'a' && dataMemoryKey === 'dat') {
dataMemoryKey += char
} else if (char === ':' && dataMemoryKey === 'data') {
dataMemoryKey += char
} else if (char === ' ' && dataMemoryKey === 'data:') {
dataMemoryKey += char

// Must remove extra 'data: ' and '/n' added to stringBuffer
stringBuffer = stringBuffer.substring(0, stringBuffer.length - 'data: '.length - 1)

// Save and clear stringBuffer
eventMessage = stringBuffer
stringBuffer = ''
eventMemoryKey = ''
} else if (dataMemoryKey === 'data: ') {
stringBuffer += char // Start saving characters in string buffer
}
else {
dataMemoryKey = ''
}
}

try {
const parsedBuffer = JSON.parse(stringBuffer)
eventEmitter.emit('data', {
event: eventMessage,
data: parsedBuffer
})

stringBuffer = ''
} catch(e) {
if (!(e instanceof SyntaxError)) {
throw Error('Unexpected error when trying to emit data event')
}
// stringBuffer was not parseable, we must continue and wait for more chunks
}
});

return eventEmitter
}

0 comments on commit 1998816

Please sign in to comment.