diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts index 3f0dbf3bc2437..501a7d5ce7ddc 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts @@ -214,5 +214,69 @@ describe('KafkaMessageParser', () => { expect(results[0]?.session_id).toBe('session1') expect(results[1]?.session_id).toBe('session2') }) + + it('filters out events with zero or negative timestamps', async () => { + const snapshotItems = [ + { type: 1, timestamp: 1234567890 }, + { type: 2, timestamp: 0 }, + { type: 3, timestamp: -1000 }, + { type: 4, timestamp: 1234567891 }, + ] + const messages = [ + createMessage({ + data: JSON.stringify({ + event: '$snapshot_items', + properties: { + $session_id: 'session1', + $window_id: 'window1', + $snapshot_items: snapshotItems, + }, + }), + }), + ] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(1) + expect(results[0]?.eventsByWindowId['window1']).toHaveLength(2) + expect(results[0]?.eventsByWindowId['window1']).toEqual([ + { type: 1, timestamp: 1234567890 }, + { type: 4, timestamp: 1234567891 }, + ]) + expect(results[0]?.eventsRange).toEqual({ + start: 1234567890, + end: 1234567891, + }) + }) + + it('uses min/max for timestamp range instead of first/last', async () => { + const snapshotItems = [ + { type: 1, timestamp: 1234567890 }, // Not the smallest + { type: 2, timestamp: 1234567889 }, // Smallest + { type: 3, timestamp: 1234567892 }, // Not the largest + { type: 4, timestamp: 1234567893 }, // Largest + { type: 5, timestamp: 1234567891 }, // Middle + ] + const messages = [ + createMessage({ + data: JSON.stringify({ + event: '$snapshot_items', + properties: { + $session_id: 'session1', + $window_id: 'window1', + $snapshot_items: snapshotItems, + }, + }), + }), + ] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(1) + expect(results[0]?.eventsRange).toEqual({ + start: 1234567889, // Should be smallest timestamp + end: 1234567893, // Should be largest timestamp + }) + }) }) }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts index 7b0ec43814dd1..7084ab55ca20b 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts @@ -60,12 +60,16 @@ export class KafkaMessageParser { return dropMessage('received_non_snapshot_message') } - const events: RRWebEvent[] = $snapshot_items.filter((event: any) => event && event.timestamp) + const events: RRWebEvent[] = $snapshot_items.filter((event: any) => event && event.timestamp > 0) if (!events.length) { return dropMessage('message_contained_no_valid_rrweb_events') } + const timestamps = events.map((event) => event.timestamp) + const minTimestamp = Math.min(...timestamps) + const maxTimestamp = Math.max(...timestamps) + return { metadata: { partition: message.partition, @@ -81,8 +85,8 @@ export class KafkaMessageParser { [$window_id ?? '']: events, }, eventsRange: { - start: events[0].timestamp, - end: events[events.length - 1].timestamp, + start: minTimestamp, + end: maxTimestamp, }, snapshot_source: $snapshot_source, } diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts index 916af103fe189..e671f30623f86 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts @@ -33,14 +33,29 @@ interface MessageMetadata { } export class SnappySessionRecorderMock { - constructor(private readonly sessionId: string, private readonly teamId: number) {} - private chunks: Buffer[] = [] private size: number = 0 + private startTimestamp: number | null = null + private endTimestamp: number | null = null + + constructor(public readonly sessionId: string, public readonly teamId: number) {} public recordMessage(message: ParsedMessageData): number { let bytesWritten = 0 + if (message.eventsRange.start > 0) { + this.startTimestamp = + this.startTimestamp === null + ? message.eventsRange.start + : Math.min(this.startTimestamp, message.eventsRange.start) + } + if (message.eventsRange.end > 0) { + this.endTimestamp = + this.endTimestamp === null + ? message.eventsRange.end + : Math.max(this.endTimestamp, message.eventsRange.end) + } + Object.entries(message.eventsByWindowId).forEach(([windowId, events]) => { events.forEach((event) => { const serializedLine = JSON.stringify([windowId, event]) + '\n' @@ -58,6 +73,8 @@ export class SnappySessionRecorderMock { return { buffer, eventCount: this.chunks.length, + startTimestamp: this.startTimestamp ?? 0, + endTimestamp: this.endTimestamp ?? 0, } } } @@ -815,9 +832,14 @@ describe('SessionBatchRecorder', () => { [ { type: EventType.FullSnapshot, - timestamp: 1000, + timestamp: 2000, data: { source: 1, adds: [{ parentId: 1, nextId: 2, node: { tag: 'div' } }] }, }, + { + type: EventType.IncrementalSnapshot, + timestamp: 3000, + data: { source: 2, mutations: [{ id: 1 }] }, + }, ], undefined, 42 @@ -827,9 +849,14 @@ describe('SessionBatchRecorder', () => { [ { type: EventType.Meta, - timestamp: 1500, + timestamp: 2500, data: { href: 'https://example.com', width: 1024, height: 768 }, }, + { + type: EventType.FullSnapshot, + timestamp: 4500, + data: { source: 1, snapshot: { html: '
2
' } }, + }, ], undefined, 787 @@ -837,9 +864,14 @@ describe('SessionBatchRecorder', () => { createMessage( 'session3', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1, snapshot: { html: '
3
' } }, + }, { type: EventType.IncrementalSnapshot, - timestamp: 2000, + timestamp: 5000, data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] }, }, ], @@ -867,6 +899,12 @@ describe('SessionBatchRecorder', () => { session3: buffer3, } + const expectedTimestamps = { + session1: { start: 2000, end: 3000 }, + session2: { start: 2500, end: 4500 }, + session3: { start: 1000, end: 5000 }, + } + // Record messages in the batch recorder messages.forEach((message) => recorder.record(message)) @@ -885,6 +923,7 @@ describe('SessionBatchRecorder', () => { metadata.forEach((block) => { const expectedBuffer = expectedBuffers[block.sessionId as keyof typeof expectedBuffers] const blockData = streamOutput.slice(block.blockStartOffset, block.blockStartOffset + block.blockLength) + const expected = expectedTimestamps[block.sessionId as keyof typeof expectedTimestamps] const expectedTeamId = { session1: 42, @@ -895,6 +934,8 @@ describe('SessionBatchRecorder', () => { expect(block.teamId).toBe(expectedTeamId) expect(block.blockLength).toBe(expectedBuffer.length) expect(Buffer.from(blockData)).toEqual(expectedBuffer) + expect(block.startTimestamp).toBe(expected.start) + expect(block.endTimestamp).toBe(expected.end) }) // Verify total length matches diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts index 87066d5faf869..2fb4456c086fc 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts @@ -45,10 +45,18 @@ import { SnappySessionRecorder } from './snappy-session-recorder' */ export interface SessionBlockMetadata { + /** Unique identifier for the session */ sessionId: string + /** ID of the team that owns this session recording */ teamId: number + /** Byte offset where this session block starts in the batch file */ blockStartOffset: number + /** Length of this session block in bytes */ blockLength: number + /** Timestamp of the first event in the session block */ + startTimestamp: number + /** Timestamp of the last event in the session block */ + endTimestamp: number } export class SessionBatchRecorder { @@ -159,7 +167,7 @@ export class SessionBatchRecorder { for (const sessions of this.partitionSessions.values()) { for (const recorder of sessions.values()) { - const { buffer, eventCount } = await recorder.end() + const { buffer, eventCount, startTimestamp, endTimestamp } = await recorder.end() // Track block metadata blockMetadata.push({ @@ -167,6 +175,8 @@ export class SessionBatchRecorder { teamId: recorder.teamId, blockStartOffset: currentOffset, blockLength: buffer.length, + startTimestamp, + endTimestamp, }) currentOffset += buffer.length diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-recorder.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-recorder.test.ts deleted file mode 100644 index e0062e39645cc..0000000000000 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-recorder.test.ts +++ /dev/null @@ -1,191 +0,0 @@ -import { createGunzip } from 'zlib' - -import { ParsedMessageData } from '../kafka/types' -import { SessionRecorder } from './session-recorder' - -// RRWeb event type constants -const enum EventType { - DomContentLoaded = 0, - Load = 1, - FullSnapshot = 2, - IncrementalSnapshot = 3, - Meta = 4, - Custom = 5, -} - -describe('SessionRecorder', () => { - let recorder: SessionRecorder - - beforeEach(() => { - recorder = new SessionRecorder() - }) - - const createMessage = (windowId: string, events: any[]): ParsedMessageData => ({ - distinct_id: 'distinct_id', - session_id: 'session_id', - eventsByWindowId: { - [windowId]: events, - }, - eventsRange: { - start: events[0]?.timestamp || 0, - end: events[events.length - 1]?.timestamp || 0, - }, - metadata: { - partition: 1, - topic: 'test', - offset: 0, - timestamp: 0, - rawSize: 0, - }, - }) - - const readGzippedBuffer = async (buffer: Buffer): Promise => { - return new Promise((resolve, reject) => { - const gunzip = createGunzip() - const chunks: Buffer[] = [] - - gunzip.on('data', (chunk) => { - chunks.push(chunk) - }) - - gunzip.on('end', () => { - try { - const result = Buffer.concat(chunks) - .toString() - .trim() - .split('\n') - .map((line) => line.trim()) - .filter(Boolean) - .map((line) => JSON.parse(line)) - resolve(result) - } catch (error) { - reject(new Error(`Failed to process decompressed data: ${error.message}`)) - } - }) - - gunzip.on('error', (error) => { - reject(new Error(`Error decompressing data: ${error.message}`)) - }) - - // Write the entire buffer and end - gunzip.end(buffer) - }) - } - - describe('recordMessage', () => { - it('should record events in gzipped JSONL format', async () => { - const events = [ - { - type: EventType.FullSnapshot, - timestamp: 1000, - data: { - source: 1, - adds: [{ parentId: 1, nextId: 2, node: { tag: 'div', attrs: { class: 'test' } } }], - }, - }, - { - type: EventType.IncrementalSnapshot, - timestamp: 2000, - data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] }, - }, - ] - const message = createMessage('window1', events) - - const rawBytesWritten = recorder.recordMessage(message) - expect(rawBytesWritten).toBeGreaterThan(0) - - const { buffer, eventCount } = await recorder.end() - const lines = await readGzippedBuffer(buffer) - - expect(lines).toEqual([ - ['window1', events[0]], - ['window1', events[1]], - ]) - expect(eventCount).toBe(2) - }) - - it('should handle multiple windows with multiple events', async () => { - const events = { - window1: [ - { - type: EventType.Meta, - timestamp: 1000, - data: { href: 'https://example.com', width: 1024, height: 768 }, - }, - { - type: EventType.FullSnapshot, - timestamp: 1500, - data: { - source: 1, - adds: [{ parentId: 1, nextId: null, node: { tag: 'h1', attrs: { id: 'title' } } }], - }, - }, - ], - window2: [ - { - type: EventType.Custom, - timestamp: 2000, - data: { tag: 'user-interaction', payload: { type: 'click', target: '#submit-btn' } }, - }, - { - type: EventType.IncrementalSnapshot, - timestamp: 2500, - data: { source: 3, mousemove: [{ x: 100, y: 200, id: 1 }] }, - }, - ], - } - const message: ParsedMessageData = { - ...createMessage('', []), - eventsByWindowId: events, - } - - recorder.recordMessage(message) - const { buffer, eventCount } = await recorder.end() - const lines = await readGzippedBuffer(buffer) - - expect(lines).toEqual([ - ['window1', events.window1[0]], - ['window1', events.window1[1]], - ['window2', events.window2[0]], - ['window2', events.window2[1]], - ]) - expect(eventCount).toBe(4) - }) - - it('should handle empty events array', async () => { - const message = createMessage('window1', []) - recorder.recordMessage(message) - - const { buffer, eventCount } = await recorder.end() - const lines = await readGzippedBuffer(buffer) - - expect(lines).toEqual([]) - expect(eventCount).toBe(0) - }) - - it('should handle large amounts of data', async () => { - const events = Array.from({ length: 10000 }, (_, i) => ({ - type: EventType.Custom, - timestamp: i * 100, - data: { value: 'x'.repeat(1000) }, - })) - - // Split events into 100 messages of 100 events each - for (let i = 0; i < events.length; i += 100) { - const messageEvents = events.slice(i, i + 100) - const message = createMessage('window1', messageEvents) - recorder.recordMessage(message) - } - - const { buffer, eventCount } = await recorder.end() - const lines = await readGzippedBuffer(buffer) - - expect(lines.length).toBe(10000) - expect(eventCount).toBe(10000) - - // Verify first and last events - expect(lines[0]).toEqual(['window1', events[0]]) - expect(lines[lines.length - 1]).toEqual(['window1', events[events.length - 1]]) - }) - }) -}) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-recorder.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-recorder.ts deleted file mode 100644 index 4e63d15f9d6cc..0000000000000 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-recorder.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { createGzip } from 'zlib' - -import { ParsedMessageData } from '../kafka/types' - -export interface EndResult { - /** The complete compressed session block */ - buffer: Buffer - /** Number of events in the session block */ - eventCount: number -} - -/** - * Records events for a single session recording - * - * Buffers events and provides them as a gzipped session recording block that can be - * stored in a session batch file. The session recording block can be read as an independent unit. - * - * ``` - * Session Batch File - * ├── Gzipped Session Recording Block 1 <── One SessionRecorder corresponds to one block - * │ └── JSONL Session Recording Block - * │ ├── [windowId, event1] - * │ ├── [windowId, event2] - * │ └── ... - * ├── Gzipped Session Recording Block 2 - * │ └── JSONL Session Recording Block - * │ ├── [windowId, event1] - * │ └── ... - * └── ... - * ``` - * - * The session block format (after decompression) is a sequence of newline-delimited JSON records. - * Each record is an array of [windowId, event]. - */ -export class SessionRecorder { - private readonly gzip: ReturnType - private readonly chunks: Buffer[] = [] - private eventCount: number = 0 - private rawBytesWritten: number = 0 - private ended = false - // Store any gzip error that occurs - these should be rare/never happen in practice - // We keep the error until end() to keep the recordMessage interface simple - private gzipError: Error | null = null - - constructor() { - this.gzip = createGzip() - this.gzip.on('data', (chunk: Buffer) => { - this.chunks.push(chunk) - }) - this.gzip.on('error', (error) => { - this.gzipError = error - }) - } - - /** - * Records a message containing events for this session - * Events are added to the gzip buffer immediately - * - * @param message - Message containing events for one or more windows - * @returns Number of raw bytes written (before compression) - * @throws If called after end() - */ - public recordMessage(message: ParsedMessageData): number { - if (this.ended) { - throw new Error('Cannot record message after end() has been called') - } - - let rawBytesWritten = 0 - - Object.entries(message.eventsByWindowId).forEach(([windowId, events]) => { - events.forEach((event) => { - const serializedLine = JSON.stringify([windowId, event]) + '\n' - this.gzip.write(serializedLine) - rawBytesWritten += Buffer.byteLength(serializedLine) - this.eventCount++ - }) - }) - - this.rawBytesWritten += rawBytesWritten - return rawBytesWritten - } - - /** - * Finalizes and returns the compressed session block - * - * @returns The complete compressed session block and event count - * @throws If called more than once - */ - public async end(): Promise { - if (this.ended) { - throw new Error('end() has already been called') - } - this.ended = true - - return new Promise((resolve, reject) => { - if (this.gzipError) { - reject(this.gzipError) - return - } - - this.gzip.on('end', () => { - if (this.gzipError) { - reject(this.gzipError) - return - } - resolve({ - // Buffer.concat typings are missing the signature with Buffer[] - buffer: Buffer.concat(this.chunks as any[]), - eventCount: this.eventCount, - }) - }) - - this.gzip.end() - }) - } -} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.test.ts index 12a019fb0b521..9f245acbdf675 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.test.ts @@ -182,4 +182,56 @@ describe('SnappySessionRecorder', () => { await expect(recorder.end()).rejects.toThrow('end() has already been called') }) }) + + describe('timestamps', () => { + it('should track start and end timestamps from events range', async () => { + const events = [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1 }, + }, + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2 }, + }, + ] + const message = createMessage('window1', events) + + recorder.recordMessage(message) + const result = await recorder.end() + + expect(result.startTimestamp).toBe(1000) + expect(result.endTimestamp).toBe(2000) + }) + + it('should track min/max timestamps across multiple messages', async () => { + const messages = [ + createMessage('window1', [ + { type: EventType.Meta, timestamp: 2000 }, + { type: EventType.FullSnapshot, timestamp: 3000 }, + ]), + createMessage('window2', [ + { type: EventType.FullSnapshot, timestamp: 1000 }, + { type: EventType.IncrementalSnapshot, timestamp: 4000 }, + ]), + ] + + messages.forEach((message) => recorder.recordMessage(message)) + const result = await recorder.end() + + expect(result.startTimestamp).toBe(1000) // Min from all messages + expect(result.endTimestamp).toBe(4000) // Max from all messages + }) + + it('should handle empty events array', async () => { + const message = createMessage('window1', []) + recorder.recordMessage(message) + const result = await recorder.end() + + expect(result.startTimestamp).toBe(0) + expect(result.endTimestamp).toBe(0) + }) + }) }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.ts index ccbe04278ae10..f602fd9d578c3 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/snappy-session-recorder.ts @@ -7,6 +7,10 @@ export interface EndResult { buffer: Buffer /** Number of events in the session block */ eventCount: number + /** Timestamp of the first event in the session block */ + startTimestamp: number + /** Timestamp of the last event in the session block */ + endTimestamp: number } /** @@ -37,6 +41,8 @@ export class SnappySessionRecorder { private eventCount: number = 0 private rawBytesWritten: number = 0 private ended = false + private startTimestamp: number | null = null + private endTimestamp: number | null = null constructor(public readonly sessionId: string, public readonly teamId: number) {} @@ -55,6 +61,17 @@ export class SnappySessionRecorder { let rawBytesWritten = 0 + // Note: We don't need to check for zero timestamps here because: + // 1. KafkaMessageParser filters out events with zero timestamps + // 2. KafkaMessageParser drops messages with no events + // Therefore, eventsRange.start and eventsRange.end will always be present and non-zero + this.startTimestamp = + this.startTimestamp === null + ? message.eventsRange.start + : Math.min(this.startTimestamp, message.eventsRange.start) + this.endTimestamp = + this.endTimestamp === null ? message.eventsRange.end : Math.max(this.endTimestamp, message.eventsRange.end) + Object.entries(message.eventsByWindowId).forEach(([windowId, events]) => { events.forEach((event) => { const serializedLine = JSON.stringify([windowId, event]) + '\n' @@ -88,6 +105,8 @@ export class SnappySessionRecorder { return { buffer, eventCount: this.eventCount, + startTimestamp: this.startTimestamp ?? 0, + endTimestamp: this.endTimestamp ?? 0, } } }