From 1998816d79a7630ae3fb9e6a25cb392ac3c36c97 Mon Sep 17 00:00:00 2001 From: Angelo Paparazzi Date: Fri, 22 Nov 2024 10:16:32 -0500 Subject: [PATCH] feat(common): add parseSSEStream function This function parses the raw buffer recieved from sse requests and emits completed chunks at O(n) --- lib/common.ts | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/lib/common.ts b/lib/common.ts index 0f3abc17ea..0222db80c6 100644 --- a/lib/common.ts +++ b/lib/common.ts @@ -1,4 +1,5 @@ import os = require('os'); +import { EventEmitter } from 'events'; // tslint:disable-next-line:no-var-requires const pkg = require('../package.json'); @@ -30,3 +31,101 @@ export function getSdkHeaders(serviceName: string, serviceVersion: string, opera return headers; } + +export function parseSSEStream(stream: NodeJS.ReadableStream) { + let eventMemoryKey = '' + let dataMemoryKey = '' + let stringBuffer = '' + let eventMessage = '' + const eventEmitter = new EventEmitter() + + stream.on('data', (chunk: Buffer) => { + const chunkString = chunk.toString() + + for (const char of chunkString) { + // Looks for "event: " + if (char === 'e' && eventMemoryKey === '') { + eventMemoryKey += char + } else if (char === 'v' && eventMemoryKey === 'e') { + eventMemoryKey += char + } else if (char === 'e' && eventMemoryKey === 'ev') { + eventMemoryKey += char + } else if (char === 'n' && eventMemoryKey === 'eve') { + eventMemoryKey += char + } else if (char === 't' && eventMemoryKey === 'even') { + eventMemoryKey += char + } else if (char === ':' && eventMemoryKey === 'event') { + eventMemoryKey += char + } else if (char === ' ' && eventMemoryKey === 'event:') { + eventMemoryKey += char + } else if (eventMemoryKey === 'event: ' && dataMemoryKey !== 'data: ') { + stringBuffer += char // Start saving characters in string buffer + } else if (eventMemoryKey === 'event: ' && dataMemoryKey === 'data: ') { + // Must remove extra 'event: ' added to stringBuffer + const trimmedStringBuffer = stringBuffer.substring(0, stringBuffer.length - 'event: '.length) + + try { + const parsedBuffer = JSON.parse(trimmedStringBuffer) + eventEmitter.emit('data', { + event: eventMessage, + data: parsedBuffer + }) + } catch(e) { + if (!(e instanceof SyntaxError)) { + throw Error('Unexpected error when trying to emit data event') + } + // stringBuffer was not parseable, we must continue and wait for more chunks + } + + stringBuffer = char + dataMemoryKey = '' + } else { + eventMemoryKey = '' + } + // Looks for "data: " + if (char === 'd' && eventMemoryKey === 'event: ') { + dataMemoryKey += char + } else if (char === 'a' && dataMemoryKey === 'd') { + dataMemoryKey += char + } else if (char === 't' && dataMemoryKey === 'da') { + dataMemoryKey += char + } else if (char === 'a' && dataMemoryKey === 'dat') { + dataMemoryKey += char + } else if (char === ':' && dataMemoryKey === 'data') { + dataMemoryKey += char + } else if (char === ' ' && dataMemoryKey === 'data:') { + dataMemoryKey += char + + // Must remove extra 'data: ' and '/n' added to stringBuffer + stringBuffer = stringBuffer.substring(0, stringBuffer.length - 'data: '.length - 1) + + // Save and clear stringBuffer + eventMessage = stringBuffer + stringBuffer = '' + eventMemoryKey = '' + } else if (dataMemoryKey === 'data: ') { + stringBuffer += char // Start saving characters in string buffer + } + else { + dataMemoryKey = '' + } + } + + try { + const parsedBuffer = JSON.parse(stringBuffer) + eventEmitter.emit('data', { + event: eventMessage, + data: parsedBuffer + }) + + stringBuffer = '' + } catch(e) { + if (!(e instanceof SyntaxError)) { + throw Error('Unexpected error when trying to emit data event') + } + // stringBuffer was not parseable, we must continue and wait for more chunks + } + }); + + return eventEmitter +}