Skip to content

Commit

Permalink
refactor: update recording v2 s3 env variables (PostHog#28140)
Browse files Browse the repository at this point in the history
  • Loading branch information
pl authored Feb 3, 2025
1 parent df2dc69 commit a3adbfb
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { S3Client } from '@aws-sdk/client-s3'
import { captureException } from '@sentry/node'
import {
CODES,
Expand Down Expand Up @@ -62,7 +63,7 @@ export class SessionRecordingIngester {
constructor(
private config: PluginsServerConfig,
private consumeOverflow: boolean,
private postgres: PostgresRouter,
postgres: PostgresRouter,
batchConsumerFactory: BatchConsumerFactory,
ingestionWarningProducer?: KafkaProducerWrapper
) {
Expand All @@ -75,6 +76,20 @@ export class SessionRecordingIngester {

this.promiseScheduler = new PromiseScheduler()

let s3Client: S3Client | null = null
if (
config.SESSION_RECORDING_V2_S3_ENDPOINT &&
config.SESSION_RECORDING_V2_S3_REGION &&
config.SESSION_RECORDING_V2_S3_BUCKET &&
config.SESSION_RECORDING_V2_S3_PREFIX
) {
s3Client = new S3Client({
region: config.SESSION_RECORDING_V2_S3_REGION,
endpoint: config.SESSION_RECORDING_V2_S3_ENDPOINT,
forcePathStyle: true,
})
}

this.kafkaParser = new KafkaMessageParser()
this.teamFilter = new TeamFilter(new TeamService(postgres))
if (ingestionWarningProducer) {
Expand All @@ -85,16 +100,13 @@ export class SessionRecordingIngester {
}

const offsetManager = new KafkaOffsetManager(this.commitOffsets.bind(this), this.topic)
const writer =
this.config.SESSION_RECORDING_V2_S3_BUCKET &&
this.config.SESSION_RECORDING_V2_S3_REGION &&
this.config.SESSION_RECORDING_V2_S3_PREFIX
? new S3SessionBatchWriter({
bucket: this.config.SESSION_RECORDING_V2_S3_BUCKET,
prefix: this.config.SESSION_RECORDING_V2_S3_PREFIX,
region: this.config.SESSION_RECORDING_V2_S3_REGION,
})
: new BlackholeSessionBatchWriter()
const writer = s3Client
? new S3SessionBatchWriter(
s3Client,
this.config.SESSION_RECORDING_V2_S3_BUCKET!,
this.config.SESSION_RECORDING_V2_S3_PREFIX!
)
: new BlackholeSessionBatchWriter()

this.sessionBatchManager = new SessionBatchManager({
maxBatchSizeBytes: (config.SESSION_RECORDING_MAX_BATCH_SIZE_KB ?? 1024) * 1024,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { Readable } from 'stream'
import { status } from '../../../../utils/status'
import { S3SessionBatchWriter } from './s3-session-batch-writer'

jest.mock('@aws-sdk/client-s3')
jest.mock('@aws-sdk/lib-storage')
jest.mock('../../../../utils/status')

Expand All @@ -14,10 +13,11 @@ describe('S3SessionBatchWriter', () => {
let mockUpload: jest.Mock
let mockUploadDone: jest.Mock
let uploadedData: Buffer
let mockS3Client: jest.Mocked<S3Client>

beforeEach(() => {
uploadedData = Buffer.alloc(0)
jest.mocked(S3Client).mockImplementation(() => ({} as any))
mockS3Client = {} as jest.Mocked<S3Client>
mockUploadDone = jest.fn().mockImplementation(async () => {
const stream = mockUpload.mock.calls[0][0].params.Body as Readable
for await (const chunk of stream) {
Expand All @@ -32,35 +32,25 @@ describe('S3SessionBatchWriter', () => {
return { done: mockUploadDone } as unknown as Upload
})

writer = new S3SessionBatchWriter({
bucket: 'test-bucket',
prefix: 'test-prefix',
region: 'test-region',
})
writer = new S3SessionBatchWriter(mockS3Client, 'test-bucket', 'test-prefix')
})

afterEach(() => {
jest.clearAllMocks()
uploadedData = Buffer.alloc(0)
})

it('should create an S3 client with the correct config', () => {
expect(S3Client).toHaveBeenCalledWith({ region: 'test-region' })
expect(status.info).toHaveBeenCalledWith('🔄', 's3_session_batch_writer_created', {
bucket: 'test-bucket',
prefix: 'test-prefix',
})
})

describe('open()', () => {
it('should pass the returned stream as the S3 upload body', () => {
const { stream } = writer.newBatch()

expect(mockUpload).toHaveBeenCalledTimes(1)
expect(mockUpload).toHaveBeenCalledWith(
expect.objectContaining({
client: mockS3Client,
params: expect.objectContaining({
Body: stream,
Bucket: 'test-bucket',
}),
})
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,7 @@ import { SessionBatchFileWriter, StreamWithFinish } from './session-batch-file-w
* Writes session batch files to S3
*/
export class S3SessionBatchWriter implements SessionBatchFileWriter {
private readonly s3: S3Client
private readonly bucket: string
private readonly prefix: string

constructor(config: { bucket: string; prefix: string; region: string }) {
this.s3 = new S3Client({ region: config.region })
this.bucket = config.bucket
this.prefix = config.prefix
constructor(private readonly s3: S3Client, private readonly bucket: string, private readonly prefix: string) {
status.info('🔄', 's3_session_batch_writer_created', { bucket: this.bucket, prefix: this.prefix })
}

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig

SESSION_RECORDING_V2_S3_BUCKET?: string
SESSION_RECORDING_V2_S3_PREFIX?: string
SESSION_RECORDING_V2_S3_ENDPOINT?: string
SESSION_RECORDING_V2_S3_REGION?: string
}

Expand Down

0 comments on commit a3adbfb

Please sign in to comment.