Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kjellmorten committed Oct 28, 2023
1 parent 43376d6 commit a42ba57
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 122 deletions.
4 changes: 2 additions & 2 deletions src/jobs/Step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ test('should mutate action into several actions based on iterate pipeline', asyn
type: 'SET',
payload: { type: 'entry' },
response: { status: 'ok', data: [{ id: 'ent3', title: 'Entry 3' }] },
meta: undefined,
meta,
},
setItem_0: {
...expectedAction0,
Expand Down Expand Up @@ -782,7 +782,7 @@ test('should mutate action into several actions based on iterate path', async (t
{ id: 'ent3', title: 'Entry 3' },
],
},
meta: undefined,
meta,
},
setItem_0: {
...expectedAction0,
Expand Down
228 changes: 110 additions & 118 deletions src/jobs/Step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
setDataOnActionPayload,
setResponseOnAction,
setOriginOnAction,
setErrorOnAction,
setMetaOnAction,
} from '../utils/action.js'
import { combineResponses, setOrigin } from '../utils/response.js'
import validateFilters from '../utils/validateFilters.js'
Expand Down Expand Up @@ -172,10 +174,8 @@ function responseFromSteps(
}
}

function generateIterateResponse(
action: Action,
actionsWithResponses: Action[],
) {
function generateIterateResponse(action: Action, responses: ResponsesObject) {
const actionsWithResponses = Object.values(responses)
const errorResponses = actionsWithResponses
.map(({ response }) => response)
.filter(
Expand Down Expand Up @@ -211,14 +211,6 @@ function getIterateMutator(step: JobStepDef, mapOptions: MapOptions) {
}
}

const prepareAction = (action: Action, meta: Meta) => ({
...action,
meta: {
...meta,
...(action.meta?.queue ?? meta?.queue ? { queue: true } : {}),
},
})

export function getPrevStepId(
index: number,
steps: (JobStepDef | JobStepDef[])[],
Expand All @@ -232,50 +224,12 @@ export function getPrevStepId(
: prevStep?.id
}

// Run the action for every item in the `payload.data` array
async function runIteration(
items: unknown[],
action: Action,
id: string,
concurrency: number,
runOneAction: (action: Action) => Promise<Action>,
) {
const actions = items.map((item) => setDataOnActionPayload(action, item))
const limit = pLimit(concurrency)
return Object.fromEntries(
(
await Promise.all(
actions.map((action) => limit(() => runOneAction(action))),
)
).map((response, index) => [
`${id}_${index}`,
setOriginOnAction(response, `${id}_${index}`),
]), // Set the id of each response as key for the object to be created
)
}

// Run all sub steps in parallel
async function runSubSteps(
subSteps: Step[],
id: string,
actionResponses: Record<string, Action>,
dispatch: HandlerDispatch,
meta: Meta,
) {
// TODO: Actually run all parallel steps, even if one fails
const arrayOfResponses = await Promise.all(
subSteps.map((step) => step.run(meta, actionResponses, dispatch)),
)
const doBreak = arrayOfResponses.some(
(response) => response[breakSymbol], // eslint-disable-line security/detect-object-injection
)
const responsesObj = Object.fromEntries(
arrayOfResponses.flatMap((responses) => Object.entries(responses)),
)
const thisStep = responseFromSteps(responsesObj)
return thisStep
? { ...responsesObj, [id]: thisStep, [breakSymbol]: doBreak }
: { ...responsesObj, [breakSymbol]: doBreak }
async function runOneAction(action: Action, dispatch: HandlerDispatch) {
try {
return setResponseOnAction(action, await dispatch(action))
} catch (error) {
return setErrorOnAction(action, error)
}
}

export default class Step {
Expand Down Expand Up @@ -320,32 +274,110 @@ export default class Step {
}
}

runAction(
/**
* Run the given action for every item in the items array.
*/
async runIteration(
actionResponses: Record<string, Action>,
dispatch: HandlerDispatch,
meta: Meta,
): (rawAction: Action) => Promise<Action> {
return async (rawAction) => {
const action = prepareAction(
await mutateAction(rawAction, this.#premutator, actionResponses),
meta,
)
) {
if (!this.#iterateMutator || !this.#action) {
return undefined // Return undefined when there's no iterate mutator or action
}
const action = setMetaOnAction(this.#action, meta)

let response
try {
response = await dispatch(action)
} catch (error) {
response = {
status: 'error',
error: error instanceof Error ? error.message : String(error),
}
}
return setResponseOnAction(action, response)
const items = ensureArray(await this.#iterateMutator(actionResponses))
const actions = items.map((item) => setDataOnActionPayload(action, item))
const limit = pLimit(this.#iterateConcurrency ?? Infinity)

return Object.fromEntries(
(
await Promise.all(
actions.map((action) =>
limit(async () =>
runOneAction(
await mutateAction(action, this.#premutator, actionResponses),
dispatch,
),
),
),
)
).map((response, index) => [
`${this.id}_${index}`, // Set the id of each response as key for the object to be created
setOriginOnAction(response, `${this.id}_${index}`),
]),
)
}

/**
* Run the action for this step.
*/
async runAction(
actionResponses: Record<string, Action>,
dispatch: HandlerDispatch,
meta: Meta,
): Promise<ResponsesObject> {
if (!this.#action) {
return {} // No action, return empty response object
}
const action = setMetaOnAction(this.#action, meta)

// Run the action for every item in the array return by the iterate mutator.
const responses = await this.runIteration(actionResponses, dispatch, meta)

// If we got any responses, combine them into one response. Otherwise
// just run the action, as no responses means there were no iterate mutator,
// so nothing has been run yet.
const responseAction = responses
? generateIterateResponse(action, responses)
: await runOneAction(
await mutateAction(action, this.#premutator, actionResponses),
dispatch,
)

// Mutate the response and return together with any individual responses
return {
...responses,
[this.id]: await mutateResponse(
responseAction,
actionResponses,
this.id,
this.#postmutator,
),
}
}

/**
* Runs this step.
* Run all sub steps in parallel.
*/
async runSubSteps(
actionResponses: Record<string, Action>,
dispatch: HandlerDispatch,
meta: Meta,
) {
if (!this.#subSteps) {
return {} // No sub steps, return empty response object
}

// TODO: Actually run all parallel steps, even if one fails
const arrayOfResponses = await Promise.all(
this.#subSteps.map((step) => step.run(meta, actionResponses, dispatch)),
)
const doBreak = arrayOfResponses.some(
(response) => response[breakSymbol], // eslint-disable-line security/detect-object-injection
)
const responsesObj = Object.fromEntries(
arrayOfResponses.flatMap((responses) => Object.entries(responses)),
)
const thisStep = responseFromSteps(responsesObj)
return thisStep
? { ...responsesObj, [this.id]: thisStep, [breakSymbol]: doBreak }
: { ...responsesObj, [breakSymbol]: doBreak }
}

/**
* Run this step.
*/
async run(
meta: Meta,
Expand All @@ -364,51 +396,11 @@ export default class Step {
}
}

// Validated, so let's run ...
const action = this.#action
if (action) {
// We have an action
const runOneAction = this.runAction(actionResponses, dispatch, meta)
let responses: ResponsesObject = {}
let responseAction: Action
if (this.#iterateMutator) {
// Run the action once for each item in the `payload.data` array
const items = ensureArray(await this.#iterateMutator(actionResponses))
responses = await runIteration(
items,
action,
this.id,
this.#iterateConcurrency || Infinity,
runOneAction,
)
responseAction = generateIterateResponse(
action,
Object.values(responses), // Combine the responses from all iterations
)
} else {
// Simply run the action
responseAction = await runOneAction(action)
}
return {
...responses,
[this.id]: await mutateResponse(
responseAction,
actionResponses,
this.id,
this.#postmutator,
),
}
} else if (this.#subSteps) {
// We have sub steps, so run these steps in parallel
return await runSubSteps(
this.#subSteps,
this.id,
actionResponses,
dispatch,
meta,
)
// Validated, so let's run the action or sub steps
if (this.#action) {
return await this.runAction(actionResponses, dispatch, meta)
} else {
return {}
return await this.runSubSteps(actionResponses, dispatch, meta)
}
}
}
52 changes: 52 additions & 0 deletions src/utils/action.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import test from 'ava'
import {
createAction,
setResponseOnAction,
setMetaOnAction,
setErrorOnAction,
setDataOnActionPayload,
setOriginOnAction,
Expand Down Expand Up @@ -138,6 +139,57 @@ test('should set response on action when no response is given', (t) => {
t.deepEqual(ret, expected)
})

// Tests -- setMetaOnAction

test('should set meta on action', (t) => {
const action = { type: 'GET', payload: { type: 'entry' } }
const meta = { ident: { id: 'johnf' }, queue: true }
const expected = {
type: 'GET',
payload: { type: 'entry' },
meta: { ident: { id: 'johnf' }, queue: true },
}

const ret = setMetaOnAction(action, meta)

t.deepEqual(ret, expected)
})

test('should not override queue from original action', (t) => {
const action = {
type: 'GET',
payload: { type: 'entry' },
meta: { queue: true },
}
const meta = { ident: { id: 'johnf' }, queue: false }
const expected = {
type: 'GET',
payload: { type: 'entry' },
meta: { ident: { id: 'johnf' }, queue: true },
}

const ret = setMetaOnAction(action, meta)

t.deepEqual(ret, expected)
})

test('should remove queue prop when not true', (t) => {
const action = {
type: 'GET',
payload: { type: 'entry' },
}
const meta = { ident: { id: 'johnf' }, queue: false }
const expected = {
type: 'GET',
payload: { type: 'entry' },
meta: { ident: { id: 'johnf' } },
}

const ret = setMetaOnAction(action, meta)

t.deepEqual(ret, expected)
})

// Tests -- setErrorOnAction

test('should set error response on action object', (t) => {
Expand Down
12 changes: 11 additions & 1 deletion src/utils/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ export function setResponseOnAction(action: Action, response?: Response) {
return { ...action, response: response || {} }
}

export function setMetaOnAction(action: Action, { queue, ...meta }: Meta) {
return {
...action,
meta: {
...meta,
...(action.meta?.queue ?? queue ? { queue: true } : {}),
},
}
}

/**
* Set the general options from an endpoint on `action.meta.options`.
*/
Expand All @@ -49,7 +59,7 @@ export function setOptionsOnAction(action: Action, endpoint: Endpoint): Action {
export function setErrorOnAction(
action: Action,
error: unknown,
origin: string,
origin?: string,
status = 'error',
): Action {
return setResponseOnAction(action, {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const debug = debugLib('great')
*/
export function createErrorResponse(
error: unknown,
origin: string,
origin?: string,
status = 'error',
reason?: string,
): Response {
Expand Down

0 comments on commit a42ba57

Please sign in to comment.