Skip to content

Commit

Permalink
feat: track timestamps for session blocks (PostHog#28189)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl authored Feb 3, 2025
1 parent 2c4bcdf commit 890e2dd
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,69 @@ describe('KafkaMessageParser', () => {
expect(results[0]?.session_id).toBe('session1')
expect(results[1]?.session_id).toBe('session2')
})

it('filters out events with zero or negative timestamps', async () => {
const snapshotItems = [
{ type: 1, timestamp: 1234567890 },
{ type: 2, timestamp: 0 },
{ type: 3, timestamp: -1000 },
{ type: 4, timestamp: 1234567891 },
]
const messages = [
createMessage({
data: JSON.stringify({
event: '$snapshot_items',
properties: {
$session_id: 'session1',
$window_id: 'window1',
$snapshot_items: snapshotItems,
},
}),
}),
]

const results = await parser.parseBatch(messages)

expect(results).toHaveLength(1)
expect(results[0]?.eventsByWindowId['window1']).toHaveLength(2)
expect(results[0]?.eventsByWindowId['window1']).toEqual([
{ type: 1, timestamp: 1234567890 },
{ type: 4, timestamp: 1234567891 },
])
expect(results[0]?.eventsRange).toEqual({
start: 1234567890,
end: 1234567891,
})
})

it('uses min/max for timestamp range instead of first/last', async () => {
const snapshotItems = [
{ type: 1, timestamp: 1234567890 }, // Not the smallest
{ type: 2, timestamp: 1234567889 }, // Smallest
{ type: 3, timestamp: 1234567892 }, // Not the largest
{ type: 4, timestamp: 1234567893 }, // Largest
{ type: 5, timestamp: 1234567891 }, // Middle
]
const messages = [
createMessage({
data: JSON.stringify({
event: '$snapshot_items',
properties: {
$session_id: 'session1',
$window_id: 'window1',
$snapshot_items: snapshotItems,
},
}),
}),
]

const results = await parser.parseBatch(messages)

expect(results).toHaveLength(1)
expect(results[0]?.eventsRange).toEqual({
start: 1234567889, // Should be smallest timestamp
end: 1234567893, // Should be largest timestamp
})
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,16 @@ export class KafkaMessageParser {
return dropMessage('received_non_snapshot_message')
}

const events: RRWebEvent[] = $snapshot_items.filter((event: any) => event && event.timestamp)
const events: RRWebEvent[] = $snapshot_items.filter((event: any) => event && event.timestamp > 0)

if (!events.length) {
return dropMessage('message_contained_no_valid_rrweb_events')
}

const timestamps = events.map((event) => event.timestamp)
const minTimestamp = Math.min(...timestamps)
const maxTimestamp = Math.max(...timestamps)

return {
metadata: {
partition: message.partition,
Expand All @@ -81,8 +85,8 @@ export class KafkaMessageParser {
[$window_id ?? '']: events,
},
eventsRange: {
start: events[0].timestamp,
end: events[events.length - 1].timestamp,
start: minTimestamp,
end: maxTimestamp,
},
snapshot_source: $snapshot_source,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,29 @@ interface MessageMetadata {
}

export class SnappySessionRecorderMock {
constructor(private readonly sessionId: string, private readonly teamId: number) {}

private chunks: Buffer[] = []
private size: number = 0
private startTimestamp: number | null = null
private endTimestamp: number | null = null

constructor(public readonly sessionId: string, public readonly teamId: number) {}

public recordMessage(message: ParsedMessageData): number {
let bytesWritten = 0

if (message.eventsRange.start > 0) {
this.startTimestamp =
this.startTimestamp === null
? message.eventsRange.start
: Math.min(this.startTimestamp, message.eventsRange.start)
}
if (message.eventsRange.end > 0) {
this.endTimestamp =
this.endTimestamp === null
? message.eventsRange.end
: Math.max(this.endTimestamp, message.eventsRange.end)
}

Object.entries(message.eventsByWindowId).forEach(([windowId, events]) => {
events.forEach((event) => {
const serializedLine = JSON.stringify([windowId, event]) + '\n'
Expand All @@ -58,6 +73,8 @@ export class SnappySessionRecorderMock {
return {
buffer,
eventCount: this.chunks.length,
startTimestamp: this.startTimestamp ?? 0,
endTimestamp: this.endTimestamp ?? 0,
}
}
}
Expand Down Expand Up @@ -815,9 +832,14 @@ describe('SessionBatchRecorder', () => {
[
{
type: EventType.FullSnapshot,
timestamp: 1000,
timestamp: 2000,
data: { source: 1, adds: [{ parentId: 1, nextId: 2, node: { tag: 'div' } }] },
},
{
type: EventType.IncrementalSnapshot,
timestamp: 3000,
data: { source: 2, mutations: [{ id: 1 }] },
},
],
undefined,
42
Expand All @@ -827,19 +849,29 @@ describe('SessionBatchRecorder', () => {
[
{
type: EventType.Meta,
timestamp: 1500,
timestamp: 2500,
data: { href: 'https://example.com', width: 1024, height: 768 },
},
{
type: EventType.FullSnapshot,
timestamp: 4500,
data: { source: 1, snapshot: { html: '<div>2</div>' } },
},
],
undefined,
787
),
createMessage(
'session3',
[
{
type: EventType.FullSnapshot,
timestamp: 1000,
data: { source: 1, snapshot: { html: '<div>3</div>' } },
},
{
type: EventType.IncrementalSnapshot,
timestamp: 2000,
timestamp: 5000,
data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] },
},
],
Expand Down Expand Up @@ -867,6 +899,12 @@ describe('SessionBatchRecorder', () => {
session3: buffer3,
}

const expectedTimestamps = {
session1: { start: 2000, end: 3000 },
session2: { start: 2500, end: 4500 },
session3: { start: 1000, end: 5000 },
}

// Record messages in the batch recorder
messages.forEach((message) => recorder.record(message))

Expand All @@ -885,6 +923,7 @@ describe('SessionBatchRecorder', () => {
metadata.forEach((block) => {
const expectedBuffer = expectedBuffers[block.sessionId as keyof typeof expectedBuffers]
const blockData = streamOutput.slice(block.blockStartOffset, block.blockStartOffset + block.blockLength)
const expected = expectedTimestamps[block.sessionId as keyof typeof expectedTimestamps]

const expectedTeamId = {
session1: 42,
Expand All @@ -895,6 +934,8 @@ describe('SessionBatchRecorder', () => {
expect(block.teamId).toBe(expectedTeamId)
expect(block.blockLength).toBe(expectedBuffer.length)
expect(Buffer.from(blockData)).toEqual(expectedBuffer)
expect(block.startTimestamp).toBe(expected.start)
expect(block.endTimestamp).toBe(expected.end)
})

// Verify total length matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ import { SnappySessionRecorder } from './snappy-session-recorder'
*/

export interface SessionBlockMetadata {
/** Unique identifier for the session */
sessionId: string
/** ID of the team that owns this session recording */
teamId: number
/** Byte offset where this session block starts in the batch file */
blockStartOffset: number
/** Length of this session block in bytes */
blockLength: number
/** Timestamp of the first event in the session block */
startTimestamp: number
/** Timestamp of the last event in the session block */
endTimestamp: number
}

export class SessionBatchRecorder {
Expand Down Expand Up @@ -159,14 +167,16 @@ export class SessionBatchRecorder {

for (const sessions of this.partitionSessions.values()) {
for (const recorder of sessions.values()) {
const { buffer, eventCount } = await recorder.end()
const { buffer, eventCount, startTimestamp, endTimestamp } = await recorder.end()

// Track block metadata
blockMetadata.push({
sessionId: recorder.sessionId,
teamId: recorder.teamId,
blockStartOffset: currentOffset,
blockLength: buffer.length,
startTimestamp,
endTimestamp,
})
currentOffset += buffer.length

Expand Down
Loading

0 comments on commit 890e2dd

Please sign in to comment.