diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts index 192ff0b0d58d7..02a392c28cf1a 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts @@ -1,3 +1,4 @@ +import { S3Client } from '@aws-sdk/client-s3' import { captureException } from '@sentry/node' import { CODES, @@ -62,7 +63,7 @@ export class SessionRecordingIngester { constructor( private config: PluginsServerConfig, private consumeOverflow: boolean, - private postgres: PostgresRouter, + postgres: PostgresRouter, batchConsumerFactory: BatchConsumerFactory, ingestionWarningProducer?: KafkaProducerWrapper ) { @@ -75,6 +76,20 @@ export class SessionRecordingIngester { this.promiseScheduler = new PromiseScheduler() + let s3Client: S3Client | null = null + if ( + config.SESSION_RECORDING_V2_S3_ENDPOINT && + config.SESSION_RECORDING_V2_S3_REGION && + config.SESSION_RECORDING_V2_S3_BUCKET && + config.SESSION_RECORDING_V2_S3_PREFIX + ) { + s3Client = new S3Client({ + region: config.SESSION_RECORDING_V2_S3_REGION, + endpoint: config.SESSION_RECORDING_V2_S3_ENDPOINT, + forcePathStyle: true, + }) + } + this.kafkaParser = new KafkaMessageParser() this.teamFilter = new TeamFilter(new TeamService(postgres)) if (ingestionWarningProducer) { @@ -85,16 +100,13 @@ export class SessionRecordingIngester { } const offsetManager = new KafkaOffsetManager(this.commitOffsets.bind(this), this.topic) - const writer = - this.config.SESSION_RECORDING_V2_S3_BUCKET && - this.config.SESSION_RECORDING_V2_S3_REGION && - this.config.SESSION_RECORDING_V2_S3_PREFIX - ? new S3SessionBatchWriter({ - bucket: this.config.SESSION_RECORDING_V2_S3_BUCKET, - prefix: this.config.SESSION_RECORDING_V2_S3_PREFIX, - region: this.config.SESSION_RECORDING_V2_S3_REGION, - }) - : new BlackholeSessionBatchWriter() + const writer = s3Client + ? new S3SessionBatchWriter( + s3Client, + this.config.SESSION_RECORDING_V2_S3_BUCKET!, + this.config.SESSION_RECORDING_V2_S3_PREFIX! + ) + : new BlackholeSessionBatchWriter() this.sessionBatchManager = new SessionBatchManager({ maxBatchSizeBytes: (config.SESSION_RECORDING_MAX_BATCH_SIZE_KB ?? 1024) * 1024, diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.test.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.test.ts index 488d4ac2afe53..f13a3a5076f33 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.test.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.test.ts @@ -5,7 +5,6 @@ import { Readable } from 'stream' import { status } from '../../../../utils/status' import { S3SessionBatchWriter } from './s3-session-batch-writer' -jest.mock('@aws-sdk/client-s3') jest.mock('@aws-sdk/lib-storage') jest.mock('../../../../utils/status') @@ -14,10 +13,11 @@ describe('S3SessionBatchWriter', () => { let mockUpload: jest.Mock let mockUploadDone: jest.Mock let uploadedData: Buffer + let mockS3Client: jest.Mocked beforeEach(() => { uploadedData = Buffer.alloc(0) - jest.mocked(S3Client).mockImplementation(() => ({} as any)) + mockS3Client = {} as jest.Mocked mockUploadDone = jest.fn().mockImplementation(async () => { const stream = mockUpload.mock.calls[0][0].params.Body as Readable for await (const chunk of stream) { @@ -32,11 +32,7 @@ describe('S3SessionBatchWriter', () => { return { done: mockUploadDone } as unknown as Upload }) - writer = new S3SessionBatchWriter({ - bucket: 'test-bucket', - prefix: 'test-prefix', - region: 'test-region', - }) + writer = new S3SessionBatchWriter(mockS3Client, 'test-bucket', 'test-prefix') }) afterEach(() => { @@ -44,14 +40,6 @@ describe('S3SessionBatchWriter', () => { uploadedData = Buffer.alloc(0) }) - it('should create an S3 client with the correct config', () => { - expect(S3Client).toHaveBeenCalledWith({ region: 'test-region' }) - expect(status.info).toHaveBeenCalledWith('🔄', 's3_session_batch_writer_created', { - bucket: 'test-bucket', - prefix: 'test-prefix', - }) - }) - describe('open()', () => { it('should pass the returned stream as the S3 upload body', () => { const { stream } = writer.newBatch() @@ -59,8 +47,10 @@ describe('S3SessionBatchWriter', () => { expect(mockUpload).toHaveBeenCalledTimes(1) expect(mockUpload).toHaveBeenCalledWith( expect.objectContaining({ + client: mockS3Client, params: expect.objectContaining({ Body: stream, + Bucket: 'test-bucket', }), }) ) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.ts index 88c942f7b5543..070a04c188bf4 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/s3-session-batch-writer.ts @@ -10,14 +10,7 @@ import { SessionBatchFileWriter, StreamWithFinish } from './session-batch-file-w * Writes session batch files to S3 */ export class S3SessionBatchWriter implements SessionBatchFileWriter { - private readonly s3: S3Client - private readonly bucket: string - private readonly prefix: string - - constructor(config: { bucket: string; prefix: string; region: string }) { - this.s3 = new S3Client({ region: config.region }) - this.bucket = config.bucket - this.prefix = config.prefix + constructor(private readonly s3: S3Client, private readonly bucket: string, private readonly prefix: string) { status.info('🔄', 's3_session_batch_writer_created', { bucket: this.bucket, prefix: this.prefix }) } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 43dc70b410d2a..3d9318d91e348 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -339,6 +339,7 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig SESSION_RECORDING_V2_S3_BUCKET?: string SESSION_RECORDING_V2_S3_PREFIX?: string + SESSION_RECORDING_V2_S3_ENDPOINT?: string SESSION_RECORDING_V2_S3_REGION?: string }