diff --git a/src/operators/reduce.ts b/src/operators/reduce.ts index d49c05b..9370ccd 100644 --- a/src/operators/reduce.ts +++ b/src/operators/reduce.ts @@ -19,14 +19,19 @@ export function reduce( loggers: Loggers, pendingDataControl?: PendingDataControl, ): OperatorApplier { - const { provides, keep } = reduceParams + const { provides, keep, seed } = reduceParams + const immutableSeed = Object.freeze(seed) let lastBag: ValueBag = {} function wrappedReduce(acc: T, valueBag: ValueBag, index: number): T { const startedAt = new Date() loggers.onStepStarted([valueBag]) + // RxJs doesn't create a structureClone from the seed parameter when start processing. + // Developer can implement function that mutate the "acc" on reduce.fn + // To safely avoid conflicts between different runs, we copy the seed when it's a new run + const renewedAcc = index === 0 ? structuredClone(immutableSeed) : acc try { - const reduceResult = reduceParams.fn(acc, valueBag, index) + const reduceResult = reduceParams.fn(renewedAcc, valueBag, index) loggers.onStepFinished([valueBag], startedAt) lastBag = valueBag return reduceResult @@ -40,7 +45,7 @@ export function reduce( return function operatorApplier(observable: Observable) { return observable - .pipe(reduceRxJs(wrappedReduce, reduceParams.seed)) + .pipe(reduceRxJs(wrappedReduce, seed)) .pipe(map((reduceResult: T) => getNewValueBag(pick(lastBag, keep ?? []), provides, reduceResult))) } } diff --git a/test/integration/reduce.test.ts b/test/integration/reduce.test.ts index cbaa70d..f51bbdb 100644 --- a/test/integration/reduce.test.ts +++ b/test/integration/reduce.test.ts @@ -1,4 +1,4 @@ -import { fromGenerator } from '../../src' +import { fromArray, fromFn, fromGenerator } from '../../src' import { sleep } from '../../src/utils/sleep' import { getMockedJobGenerator } from '../mocks/generator.mock' import { getOnStepFinishedParamsFixture } from '../mocks/stepResult.mock' @@ -112,4 +112,46 @@ describe('Reduce', () => { [getOnStepFinishedParamsFixture({ name: 'saveCount' })], ]) }) + test('Doesnt apply mutation to "seed" in different runs when reduce.fn mutates the "acc"', async () => { + const saveChild = jest.fn() + const saveParent = jest.fn() + const saveAcc = jest.fn() + const reduceChildFn = jest.fn().mockImplementation((acc) => acc + 1) + const reduceParentFn = jest.fn().mockImplementation((acc, bag) => { + acc.push(bag) + return acc + }) + + const childFlow = fromArray({ items: [1, 2, 3], provides: 'child' }) + .pipe({ fn: saveChild }) + .reduce({ fn: reduceChildFn, seed: 0, provides: 'countChild', name: 'childReduce' }) + + const parentFlow = fromArray({ items: [1, 2, 3], provides: 'parent' }, { maxItemsFlowing: 20 }) + .pipe({ fn: saveParent }) + .pipe({ fn: childFlow.run, provides: 'locations' }) + .reduce({ fn: reduceParentFn, seed: [], provides: 'accParent', name: 'parentReduce' }) + + const rootFlow = fromFn({ fn: () => 'ahoj', provides: 'greetings' }) + .pipe({ fn: parentFlow.run, provides: 'accounts' }) + .pipe({ fn: saveAcc }) + + const expectedOutput = { + greetings: 'ahoj', + accounts: { + accParent: [ + { greetings: 'ahoj', locations: { countChild: 3 }, parent: 1 }, + { greetings: 'ahoj', locations: { countChild: 3 }, parent: 2 }, + { greetings: 'ahoj', locations: { countChild: 3 }, parent: 3 }, + ], + }, + } + + await rootFlow.run() + expect(saveAcc).toHaveBeenCalledTimes(1) + expect(saveAcc).toHaveBeenNthCalledWith(1, expectedOutput) + + await rootFlow.run() + expect(saveAcc).toHaveBeenCalledTimes(2) + expect(saveAcc).toHaveBeenNthCalledWith(2, expectedOutput) + }) })