From a42ba57a6833c161bca00b2b37c2e3f61f6771f4 Mon Sep 17 00:00:00 2001 From: Kjell-Morten Date: Sat, 28 Oct 2023 14:35:30 +0200 Subject: [PATCH] Refactor --- src/jobs/Step.test.ts | 4 +- src/jobs/Step.ts | 228 +++++++++++++++++++-------------------- src/utils/action.test.ts | 52 +++++++++ src/utils/action.ts | 12 ++- src/utils/response.ts | 2 +- 5 files changed, 176 insertions(+), 122 deletions(-) diff --git a/src/jobs/Step.test.ts b/src/jobs/Step.test.ts index 04e53244..0c5c4913 100644 --- a/src/jobs/Step.test.ts +++ b/src/jobs/Step.test.ts @@ -672,7 +672,7 @@ test('should mutate action into several actions based on iterate pipeline', asyn type: 'SET', payload: { type: 'entry' }, response: { status: 'ok', data: [{ id: 'ent3', title: 'Entry 3' }] }, - meta: undefined, + meta, }, setItem_0: { ...expectedAction0, @@ -782,7 +782,7 @@ test('should mutate action into several actions based on iterate path', async (t { id: 'ent3', title: 'Entry 3' }, ], }, - meta: undefined, + meta, }, setItem_0: { ...expectedAction0, diff --git a/src/jobs/Step.ts b/src/jobs/Step.ts index 1c5478c4..6694318d 100644 --- a/src/jobs/Step.ts +++ b/src/jobs/Step.ts @@ -6,6 +6,8 @@ import { setDataOnActionPayload, setResponseOnAction, setOriginOnAction, + setErrorOnAction, + setMetaOnAction, } from '../utils/action.js' import { combineResponses, setOrigin } from '../utils/response.js' import validateFilters from '../utils/validateFilters.js' @@ -172,10 +174,8 @@ function responseFromSteps( } } -function generateIterateResponse( - action: Action, - actionsWithResponses: Action[], -) { +function generateIterateResponse(action: Action, responses: ResponsesObject) { + const actionsWithResponses = Object.values(responses) const errorResponses = actionsWithResponses .map(({ response }) => response) .filter( @@ -211,14 +211,6 @@ function getIterateMutator(step: JobStepDef, mapOptions: MapOptions) { } } -const prepareAction = (action: Action, meta: Meta) => ({ - ...action, - meta: { - ...meta, - ...(action.meta?.queue ?? meta?.queue ? { queue: true } : {}), - }, -}) - export function getPrevStepId( index: number, steps: (JobStepDef | JobStepDef[])[], @@ -232,50 +224,12 @@ export function getPrevStepId( : prevStep?.id } -// Run the action for every item in the `payload.data` array -async function runIteration( - items: unknown[], - action: Action, - id: string, - concurrency: number, - runOneAction: (action: Action) => Promise, -) { - const actions = items.map((item) => setDataOnActionPayload(action, item)) - const limit = pLimit(concurrency) - return Object.fromEntries( - ( - await Promise.all( - actions.map((action) => limit(() => runOneAction(action))), - ) - ).map((response, index) => [ - `${id}_${index}`, - setOriginOnAction(response, `${id}_${index}`), - ]), // Set the id of each response as key for the object to be created - ) -} - -// Run all sub steps in parallel -async function runSubSteps( - subSteps: Step[], - id: string, - actionResponses: Record, - dispatch: HandlerDispatch, - meta: Meta, -) { - // TODO: Actually run all parallel steps, even if one fails - const arrayOfResponses = await Promise.all( - subSteps.map((step) => step.run(meta, actionResponses, dispatch)), - ) - const doBreak = arrayOfResponses.some( - (response) => response[breakSymbol], // eslint-disable-line security/detect-object-injection - ) - const responsesObj = Object.fromEntries( - arrayOfResponses.flatMap((responses) => Object.entries(responses)), - ) - const thisStep = responseFromSteps(responsesObj) - return thisStep - ? { ...responsesObj, [id]: thisStep, [breakSymbol]: doBreak } - : { ...responsesObj, [breakSymbol]: doBreak } +async function runOneAction(action: Action, dispatch: HandlerDispatch) { + try { + return setResponseOnAction(action, await dispatch(action)) + } catch (error) { + return setErrorOnAction(action, error) + } } export default class Step { @@ -320,32 +274,110 @@ export default class Step { } } - runAction( + /** + * Run the given action for every item in the items array. + */ + async runIteration( actionResponses: Record, dispatch: HandlerDispatch, meta: Meta, - ): (rawAction: Action) => Promise { - return async (rawAction) => { - const action = prepareAction( - await mutateAction(rawAction, this.#premutator, actionResponses), - meta, - ) + ) { + if (!this.#iterateMutator || !this.#action) { + return undefined // Return undefined when there's no iterate mutator or action + } + const action = setMetaOnAction(this.#action, meta) - let response - try { - response = await dispatch(action) - } catch (error) { - response = { - status: 'error', - error: error instanceof Error ? error.message : String(error), - } - } - return setResponseOnAction(action, response) + const items = ensureArray(await this.#iterateMutator(actionResponses)) + const actions = items.map((item) => setDataOnActionPayload(action, item)) + const limit = pLimit(this.#iterateConcurrency ?? Infinity) + + return Object.fromEntries( + ( + await Promise.all( + actions.map((action) => + limit(async () => + runOneAction( + await mutateAction(action, this.#premutator, actionResponses), + dispatch, + ), + ), + ), + ) + ).map((response, index) => [ + `${this.id}_${index}`, // Set the id of each response as key for the object to be created + setOriginOnAction(response, `${this.id}_${index}`), + ]), + ) + } + + /** + * Run the action for this step. + */ + async runAction( + actionResponses: Record, + dispatch: HandlerDispatch, + meta: Meta, + ): Promise { + if (!this.#action) { + return {} // No action, return empty response object + } + const action = setMetaOnAction(this.#action, meta) + + // Run the action for every item in the array return by the iterate mutator. + const responses = await this.runIteration(actionResponses, dispatch, meta) + + // If we got any responses, combine them into one response. Otherwise + // just run the action, as no responses means there were no iterate mutator, + // so nothing has been run yet. + const responseAction = responses + ? generateIterateResponse(action, responses) + : await runOneAction( + await mutateAction(action, this.#premutator, actionResponses), + dispatch, + ) + + // Mutate the response and return together with any individual responses + return { + ...responses, + [this.id]: await mutateResponse( + responseAction, + actionResponses, + this.id, + this.#postmutator, + ), } } /** - * Runs this step. + * Run all sub steps in parallel. + */ + async runSubSteps( + actionResponses: Record, + dispatch: HandlerDispatch, + meta: Meta, + ) { + if (!this.#subSteps) { + return {} // No sub steps, return empty response object + } + + // TODO: Actually run all parallel steps, even if one fails + const arrayOfResponses = await Promise.all( + this.#subSteps.map((step) => step.run(meta, actionResponses, dispatch)), + ) + const doBreak = arrayOfResponses.some( + (response) => response[breakSymbol], // eslint-disable-line security/detect-object-injection + ) + const responsesObj = Object.fromEntries( + arrayOfResponses.flatMap((responses) => Object.entries(responses)), + ) + const thisStep = responseFromSteps(responsesObj) + return thisStep + ? { ...responsesObj, [this.id]: thisStep, [breakSymbol]: doBreak } + : { ...responsesObj, [breakSymbol]: doBreak } + } + + /** + * Run this step. */ async run( meta: Meta, @@ -364,51 +396,11 @@ export default class Step { } } - // Validated, so let's run ... - const action = this.#action - if (action) { - // We have an action - const runOneAction = this.runAction(actionResponses, dispatch, meta) - let responses: ResponsesObject = {} - let responseAction: Action - if (this.#iterateMutator) { - // Run the action once for each item in the `payload.data` array - const items = ensureArray(await this.#iterateMutator(actionResponses)) - responses = await runIteration( - items, - action, - this.id, - this.#iterateConcurrency || Infinity, - runOneAction, - ) - responseAction = generateIterateResponse( - action, - Object.values(responses), // Combine the responses from all iterations - ) - } else { - // Simply run the action - responseAction = await runOneAction(action) - } - return { - ...responses, - [this.id]: await mutateResponse( - responseAction, - actionResponses, - this.id, - this.#postmutator, - ), - } - } else if (this.#subSteps) { - // We have sub steps, so run these steps in parallel - return await runSubSteps( - this.#subSteps, - this.id, - actionResponses, - dispatch, - meta, - ) + // Validated, so let's run the action or sub steps + if (this.#action) { + return await this.runAction(actionResponses, dispatch, meta) } else { - return {} + return await this.runSubSteps(actionResponses, dispatch, meta) } } } diff --git a/src/utils/action.test.ts b/src/utils/action.test.ts index 59981380..15489c55 100644 --- a/src/utils/action.test.ts +++ b/src/utils/action.test.ts @@ -3,6 +3,7 @@ import test from 'ava' import { createAction, setResponseOnAction, + setMetaOnAction, setErrorOnAction, setDataOnActionPayload, setOriginOnAction, @@ -138,6 +139,57 @@ test('should set response on action when no response is given', (t) => { t.deepEqual(ret, expected) }) +// Tests -- setMetaOnAction + +test('should set meta on action', (t) => { + const action = { type: 'GET', payload: { type: 'entry' } } + const meta = { ident: { id: 'johnf' }, queue: true } + const expected = { + type: 'GET', + payload: { type: 'entry' }, + meta: { ident: { id: 'johnf' }, queue: true }, + } + + const ret = setMetaOnAction(action, meta) + + t.deepEqual(ret, expected) +}) + +test('should not override queue from original action', (t) => { + const action = { + type: 'GET', + payload: { type: 'entry' }, + meta: { queue: true }, + } + const meta = { ident: { id: 'johnf' }, queue: false } + const expected = { + type: 'GET', + payload: { type: 'entry' }, + meta: { ident: { id: 'johnf' }, queue: true }, + } + + const ret = setMetaOnAction(action, meta) + + t.deepEqual(ret, expected) +}) + +test('should remove queue prop when not true', (t) => { + const action = { + type: 'GET', + payload: { type: 'entry' }, + } + const meta = { ident: { id: 'johnf' }, queue: false } + const expected = { + type: 'GET', + payload: { type: 'entry' }, + meta: { ident: { id: 'johnf' } }, + } + + const ret = setMetaOnAction(action, meta) + + t.deepEqual(ret, expected) +}) + // Tests -- setErrorOnAction test('should set error response on action object', (t) => { diff --git a/src/utils/action.ts b/src/utils/action.ts index 3901ab9a..28571f09 100644 --- a/src/utils/action.ts +++ b/src/utils/action.ts @@ -33,6 +33,16 @@ export function setResponseOnAction(action: Action, response?: Response) { return { ...action, response: response || {} } } +export function setMetaOnAction(action: Action, { queue, ...meta }: Meta) { + return { + ...action, + meta: { + ...meta, + ...(action.meta?.queue ?? queue ? { queue: true } : {}), + }, + } +} + /** * Set the general options from an endpoint on `action.meta.options`. */ @@ -49,7 +59,7 @@ export function setOptionsOnAction(action: Action, endpoint: Endpoint): Action { export function setErrorOnAction( action: Action, error: unknown, - origin: string, + origin?: string, status = 'error', ): Action { return setResponseOnAction(action, { diff --git a/src/utils/response.ts b/src/utils/response.ts index 46873685..48ed6d31 100644 --- a/src/utils/response.ts +++ b/src/utils/response.ts @@ -9,7 +9,7 @@ const debug = debugLib('great') */ export function createErrorResponse( error: unknown, - origin: string, + origin?: string, status = 'error', reason?: string, ): Response {