Skip to content

Commit

Permalink
Avoid seed mutation even when developer does mutation in .reduce "acc…
Browse files Browse the repository at this point in the history
…" parameter
  • Loading branch information
Pedro Kehl authored and pedrokehl committed Mar 13, 2024
1 parent a9ed787 commit c1de2a5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
11 changes: 8 additions & 3 deletions src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ export function reduce<T>(
loggers: Loggers,
pendingDataControl?: PendingDataControl,
): OperatorApplier {
const { provides, keep } = reduceParams
const { provides, keep, seed } = reduceParams
const immutableSeed = Object.freeze(seed)
let lastBag: ValueBag = {}

function wrappedReduce(acc: T, valueBag: ValueBag, index: number): T {
const startedAt = new Date()
loggers.onStepStarted([valueBag])
// RxJs doesn't create a structureClone from the seed parameter when start processing.
// Developer can implement function that mutate the "acc" on reduce.fn
// To safely avoid conflicts between different runs, we copy the seed when it's a new run
const renewedAcc = index === 0 ? structuredClone(immutableSeed) : acc
try {
const reduceResult = reduceParams.fn(acc, valueBag, index)
const reduceResult = reduceParams.fn(renewedAcc, valueBag, index)
loggers.onStepFinished([valueBag], startedAt)
lastBag = valueBag
return reduceResult
Expand All @@ -40,7 +45,7 @@ export function reduce<T>(

return function operatorApplier(observable: Observable<ValueBag>) {
return observable
.pipe(reduceRxJs(wrappedReduce, reduceParams.seed))
.pipe(reduceRxJs(wrappedReduce, seed))
.pipe(map((reduceResult: T) => getNewValueBag(pick(lastBag, keep ?? []), provides, reduceResult)))
}
}
44 changes: 43 additions & 1 deletion test/integration/reduce.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fromGenerator } from '../../src'
import { fromArray, fromFn, fromGenerator } from '../../src'
import { sleep } from '../../src/utils/sleep'
import { getMockedJobGenerator } from '../mocks/generator.mock'
import { getOnStepFinishedParamsFixture } from '../mocks/stepResult.mock'
Expand Down Expand Up @@ -112,4 +112,46 @@ describe('Reduce', () => {
[getOnStepFinishedParamsFixture({ name: 'saveCount' })],
])
})
test('Doesnt apply mutation to "seed" in different runs when reduce.fn mutates the "acc"', async () => {
const saveChild = jest.fn()
const saveParent = jest.fn()
const saveAcc = jest.fn()
const reduceChildFn = jest.fn().mockImplementation((acc) => acc + 1)
const reduceParentFn = jest.fn().mockImplementation((acc, bag) => {
acc.push(bag)
return acc
})

const childFlow = fromArray({ items: [1, 2, 3], provides: 'child' })
.pipe({ fn: saveChild })
.reduce({ fn: reduceChildFn, seed: 0, provides: 'countChild', name: 'childReduce' })

const parentFlow = fromArray({ items: [1, 2, 3], provides: 'parent' }, { maxItemsFlowing: 20 })
.pipe({ fn: saveParent })
.pipe({ fn: childFlow.run, provides: 'locations' })
.reduce({ fn: reduceParentFn, seed: [], provides: 'accParent', name: 'parentReduce' })

const rootFlow = fromFn({ fn: () => 'ahoj', provides: 'greetings' })
.pipe({ fn: parentFlow.run, provides: 'accounts' })
.pipe({ fn: saveAcc })

const expectedOutput = {
greetings: 'ahoj',
accounts: {
accParent: [
{ greetings: 'ahoj', locations: { countChild: 3 }, parent: 1 },
{ greetings: 'ahoj', locations: { countChild: 3 }, parent: 2 },
{ greetings: 'ahoj', locations: { countChild: 3 }, parent: 3 },
],
},
}

await rootFlow.run()
expect(saveAcc).toHaveBeenCalledTimes(1)
expect(saveAcc).toHaveBeenNthCalledWith(1, expectedOutput)

await rootFlow.run()
expect(saveAcc).toHaveBeenCalledTimes(2)
expect(saveAcc).toHaveBeenNthCalledWith(2, expectedOutput)
})
})

0 comments on commit c1de2a5

Please sign in to comment.