Skip to content

Commit

Permalink
fix PageBulkExportJobManger tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arafubeatbox committed Aug 27, 2024
1 parent 665b486 commit fde0fa8
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Readable } from 'stream';
import { finished } from 'stream/promises';

import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors';
import { PageBulkExportJobManager } from './page-bulk-export-job-manager';
Expand Down Expand Up @@ -62,47 +63,92 @@ describe('PageBulkExportJobManager', () => {
};
const job = { _id: 'job2' } as any;
jobManager.addJob(job);
expect(jobManager.jobQueue).toContain({ job });
expect(jobManager.jobQueue.length).toBe(1);
expect(jobManager.jobQueue[0]).toEqual({ job });
expect(pageBulkExportServiceMock.executePageBulkExportJob).not.toHaveBeenCalled();
});
});

describe('updateJobStream', () => {
it('should update the stream for a job in progress', () => {
it('should set a new stream when there are no streams executing for the job', () => {
const jobId = 'job1';
const mockStream = new Readable();
jobManager.jobsInProgress[jobId] = { stream: undefined };
jobManager.updateJobStream(jobId, mockStream);
expect(jobManager.jobsInProgress[jobId].stream).toBe(mockStream);
});

it('should destroy the existing stream if it has not finished', () => {
it('should set a new stream when previous stream is finished', async() => {
const jobId = 'job1';
const existingStream = new Readable();
const mockStream = new Readable();
vi.spyOn(existingStream, 'destroy');

jobManager.jobsInProgress[jobId] = { stream: existingStream };
jobManager.updateJobStream(jobId, mockStream);
const oldStream = new Readable({
read(size) {
// End the stream immediately
this.push(null);
},
});
oldStream.read();
await finished(oldStream);

const newStream = vi.fn().mockImplementation(() => {
const stream = new Readable();
stream.destroy = vi.fn();
return stream;
})() as unknown as Readable;
jobManager.addJob({ _id: jobId } as any);
jobManager.updateJobStream(jobId, oldStream);

expect(oldStream.readableEnded).toBe(true);
jobManager.updateJobStream(jobId, newStream);
expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream);
});

expect(existingStream.destroy).toHaveBeenCalledWith(new Error('Stream not finished before next stream started'));
expect(jobManager.jobsInProgress[jobId].stream).toBe(mockStream);
it('should destroy non-finished stream with an error before setting a new stream', () => {
const jobId = 'job1';
const oldStream = vi.fn().mockImplementation(() => {
const stream = new Readable();
stream.destroy = vi.fn();
return stream;
})();

const newStream = new Readable();
const destroySpy = vi.spyOn(oldStream, 'destroy');
jobManager.addJob({ _id: jobId } as any);
jobManager.updateJobStream(jobId, oldStream);

try {
jobManager.updateJobStream(jobId, newStream);
}
catch (error) {
expect(error).toBeInstanceOf(Error);
expect(destroySpy).toHaveBeenCalledWith(expect.any(Error));
}

expect(jobManager.getJobInProgress(jobId)?.stream).toBe(newStream);
});

it('should destroy the new stream if the job is not in progress', () => {
it('should destroy the new stream with BulkExportJobExpiredError if job is not in progress', () => {
const jobId = 'job1';
const mockStream = new Readable();
vi.spyOn(mockStream, 'destroy');
const newStream = vi.fn().mockImplementation(() => {
const stream = new Readable();
stream.destroy = vi.fn();
return stream;
})();
const destroySpy = vi.spyOn(newStream, 'destroy');

jobManager.updateJobStream(jobId, mockStream);
expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
jobManager.updateJobStream(jobId, newStream);

expect(destroySpy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
});
});

describe('removeJobInProgressAndQueueNextJob', () => {
it('should remove the job in progress and queue the next job', () => {
const jobId = 'job1';
const mockStream = new Readable();
const mockStream = vi.fn().mockImplementation(() => {
const stream = new Readable();
stream.destroy = vi.fn();
return stream;
})();
vi.spyOn(mockStream, 'destroy');

const nextJob = { _id: 'job2' } as any;
Expand All @@ -115,12 +161,16 @@ describe('PageBulkExportJobManager', () => {
expect(mockStream.destroy).toHaveBeenCalledWith(expect.any(BulkExportJobExpiredError));
expect(jobManager.jobsInProgress[jobId]).toBeUndefined();
expect(jobManager.jobsInProgress[nextJob._id.toString()]).toEqual({ stream: undefined });
expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(nextJob);
expect(pageBulkExportServiceMock.executePageBulkExportJob).toHaveBeenCalledWith(nextJob, undefined);
});

it('should destroy the stream with a BulkExportJobRestartedError if job was restarted', () => {
const jobId = 'job1';
const mockStream = new Readable();
const mockStream = vi.fn().mockImplementation(() => {
const stream = new Readable();
stream.destroy = vi.fn();
return stream;
})();
vi.spyOn(mockStream, 'destroy');

jobManager.jobsInProgress[jobId] = { stream: mockStream };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export class PageBulkExportJobManager {
updateJobStream(jobId: ObjectIdLike, stream: Readable): void {
const jobInProgress = this.getJobInProgress(jobId);
if (jobInProgress != null) {
console.log(jobInProgress.stream?.readableEnded);
if (jobInProgress.stream != null && !jobInProgress.stream.readableEnded) {
jobInProgress.stream.destroy(new Error('Stream not finished before next stream started'));
}
Expand All @@ -90,7 +91,7 @@ export class PageBulkExportJobManager {
this.removeJobInProgress(jobId, isJobRestarted);

if (this.jobQueue.length > 0) {
while (this.canExecuteNextJob()) {
while (this.canExecuteNextJob() && this.jobQueue.length > 0) {
const nextJob = this.jobQueue.shift();
if (nextJob != null) {
this.jobsInProgress[nextJob.job._id.toString()] = { stream: undefined };
Expand Down

0 comments on commit fde0fa8

Please sign in to comment.