diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.spec.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.spec.ts index 03de98d2039..fccfbb49e9c 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.spec.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.spec.ts @@ -1,4 +1,5 @@ import { Readable } from 'stream'; +import { finished } from 'stream/promises'; import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors'; import { PageBulkExportJobManager } from './page-bulk-export-job-manager'; @@ -62,13 +63,14 @@ describe('PageBulkExportJobManager', () => { }; const job = { _id: 'job2' } as any; jobManager.addJob(job); - expect(jobManager.jobQueue).toContain({ job }); + expect(jobManager.jobQueue.length).toBe(1); + expect(jobManager.jobQueue[0]).toEqual({ job }); expect(pageBulkExportServiceMock.executePageBulkExportJob).not.toHaveBeenCalled(); }); }); describe('updateJobStream', () => { - it('should update the stream for a job in progress', () => { + it('should set a new stream when there are no streams executing for the job', () => { const jobId = 'job1'; const mockStream = new Readable(); jobManager.jobsInProgress[jobId] = { stream: undefined }; @@ -76,33 +78,77 @@ describe('PageBulkExportJobManager', () => { expect(jobManager.jobsInProgress[jobId].stream).toBe(mockStream); }); - it('should destroy the existing stream if it has not finished', () => { + it('should set a new stream when previous stream is finished', async() => { const jobId = 'job1'; - const existingStream = new Readable(); - const mockStream = new Readable(); - vi.spyOn(existingStream, 'destroy'); - - jobManager.jobsInProgress[jobId] = { stream: existingStream }; - jobManager.updateJobStream(jobId, mockStream); + const oldStream = new Readable({ + read(size) { + // End the stream immediately + this.push(null); + }, + }); + oldStream.read(); + await finished(oldStream); + + const newStream = vi.fn().mockImplementation(() => { + const stream = new Readable(); + stream.destroy = vi.fn(); + return stream; + })() as unknown as Readable; + jobManager.addJob({ _id: jobId } as any); + jobManager.updateJobStream(jobId, oldStream); + + expect(oldStream.readableEnded).toBe(true); + jobManager.updateJobStream(jobId, newStream); + expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream); + }); - expect(existingStream.destroy).toHaveBeenCalledWith(new Error('Stream not finished before next stream started')); - expect(jobManager.jobsInProgress[jobId].stream).toBe(mockStream); + it('should destroy non-finished stream with an error before setting a new stream', () => { + const jobId = 'job1'; + const oldStream = vi.fn().mockImplementation(() => { + const stream = new Readable(); + stream.destroy = vi.fn(); + return stream; + })(); + + const newStream = new Readable(); + const destroySpy = vi.spyOn(oldStream, 'destroy'); + jobManager.addJob({ _id: jobId } as any); + jobManager.updateJobStream(jobId, oldStream); + + try { + jobManager.updateJobStream(jobId, newStream); + } + catch (error) { + expect(error).toBeInstanceOf(Error); + expect(destroySpy).toHaveBeenCalledWith(expect.any(Error)); + } + + expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream); }); - it('should destroy the new stream if the job is not in progress', () => { + it('should destroy the new stream with BulkExportJobExpiredError if job is not in progress', () => { const jobId = 'job1'; - const mockStream = new Readable(); - vi.spyOn(mockStream, 'destroy'); + const newStream = vi.fn().mockImplementation(() => { + const stream = new Readable(); + stream.destroy = vi.fn(); + return stream; + })(); + const destroySpy = vi.spyOn(newStream, 'destroy'); - jobManager.updateJobStream(jobId, mockStream); - expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError)); + jobManager.updateJobStream(jobId, newStream); + + expect(destroySpy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError)); }); }); describe('removeJobInProgressAndQueueNextJob', () => { it('should remove the job in progress and queue the next job', () => { const jobId = 'job1'; - const mockStream = new Readable(); + const mockStream = vi.fn().mockImplementation(() => { + const stream = new Readable(); + stream.destroy = vi.fn(); + return stream; + })(); vi.spyOn(mockStream, 'destroy'); const nextJob = { _id: 'job2' } as any; @@ -115,12 +161,16 @@ describe('PageBulkExportJobManager', () => { expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError)); expect(jobManager.jobsInProgress[jobId]).toBeUndefined(); expect(jobManager.jobsInProgress[nextJob._id.toString()]).toEqual({ stream: undefined }); - expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(nextJob); + expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(nextJob, undefined); }); it('should destroy the stream with a BulkExportJobRestartedError if job was restarted', () => { const jobId = 'job1'; - const mockStream = new Readable(); + const mockStream = vi.fn().mockImplementation(() => { + const stream = new Readable(); + stream.destroy = vi.fn(); + return stream; + })(); vi.spyOn(mockStream, 'destroy'); jobManager.jobsInProgress[jobId] = { stream: mockStream }; diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts index bd4c961ce2c..f148af5680d 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export/page-bulk-export-job-manager.ts @@ -70,6 +70,7 @@ export class PageBulkExportJobManager { updateJobStream(jobId: ObjectIdLike, stream: Readable): void { const jobInProgress = this.getJobInProgress(jobId); if (jobInProgress != null) { + console.log(jobInProgress.stream?.readableEnded); if (jobInProgress.stream != null && !jobInProgress.stream.readableEnded) { jobInProgress.stream.destroy(new Error('Stream not finished before next stream started')); } @@ -90,7 +91,7 @@ export class PageBulkExportJobManager { this.removeJobInProgress(jobId, isJobRestarted); if (this.jobQueue.length > 0) { - while (this.canExecuteNextJob()) { + while (this.canExecuteNextJob() && this.jobQueue.length > 0) { const nextJob = this.jobQueue.shift(); if (nextJob != null) { this.jobsInProgress[nextJob.job._id.toString()] = { stream: undefined };