From 7b5cb998f3a32fd1c41d1b964e85095593d4d016 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 5 Mar 2025 18:48:10 -0800 Subject: [PATCH 1/7] default updates impl --- packages/workflow/src/interfaces.ts | 5 +++++ packages/workflow/src/internals.ts | 34 ++++++++++++++++++++++------- packages/workflow/src/workflow.ts | 24 ++++++++++++++++++++ 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index d8e4297ec..239dfb892 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -538,6 +538,11 @@ export type Handler< */ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => void | Promise; +/** + * A handler function accepting update calls for non-registered update names. + */ +export type DefaultUpdateHandler = (...args: any[]) => Promise | any; + /** * A validation function capable of accepting the arguments for a given UpdateDefinition. */ diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 5d8a5f091..3c07565b5 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -41,12 +41,14 @@ import { WorkflowInfo, WorkflowCreateOptionsInternal, ActivationCompletion, + DefaultUpdateHandler, } from './interfaces'; import { type SinkCall } from './sinks'; import { untrackPromise } from './stack-helpers'; import pkg from './pkg'; import { SdkFlag, assertValidFlag } from './flags'; import { executeWithLifecycleLogging, log } from './logs'; +import { DefaultUpdateHandler } from './interfaces'; const StartChildWorkflowExecutionFailedCause = { WORKFLOW_ALREADY_EXISTS: 'WORKFLOW_ALREADY_EXISTS', @@ -189,6 +191,11 @@ export class Activator implements ActivationHandler { */ defaultSignalHandler?: DefaultSignalHandler; + /** + * A update handler that catches calls for non-registered update names. + */ + defaultUpdateHandler?: DefaultUpdateHandler; + /** * Source map file for looking up the source files in response to __enhanced_stack_trace */ @@ -665,11 +672,16 @@ export class Activator implements ActivationHandler { if (!protocolInstanceId) { throw new TypeError('Missing activation update protocolInstanceId'); } - const entry = this.updateHandlers.get(name); - if (!entry) { + if (!this.updateHandlers.get(name) && !this.defaultUpdateHandler) { this.bufferedUpdates.push(activation); return; } + + const entry = this.updateHandlers.get(name) ?? { + handler: this.defaultUpdateHandler, + // Default to a warning policy. + unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON, + }; const makeInput = (): UpdateInput => ({ updateId, @@ -758,13 +770,19 @@ export class Activator implements ActivationHandler { public dispatchBufferedUpdates(): void { const bufferedUpdates = this.bufferedUpdates; while (bufferedUpdates.length) { - const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string)); - if (foundIndex === -1) { - // No buffered Updates have a handler yet. - break; + // We have a default update handler, so all updates are dispatchable. + if (this.defaultUpdateHandler) { + const update = bufferedUpdates.shift(); + this.doUpdate(update); + } else { + const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string)); + if (foundIndex === -1) { + // No buffered Updates have a handler yet. + break; + } + const [update] = bufferedUpdates.splice(foundIndex, 1); + this.doUpdate(update); } - const [update] = bufferedUpdates.splice(foundIndex, 1); - this.doUpdate(update); } } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 3c7353899..cf904b5f6 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -50,6 +50,7 @@ import { UpdateInfo, encodeChildWorkflowCancellationType, encodeParentClosePolicy, + DefaultUpdateHandler, } from './interfaces'; import { LocalActivityDoBackoff } from './errors'; import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes'; @@ -1318,6 +1319,29 @@ export function setDefaultSignalHandler(handler: DefaultSignalHandler | undefine } } +/** + * Set a update handler function that will handle updates calls for non-registered update names. + * + * Updates are dispatched to the default update handler in the order that they were accepted by the server. + * + * If this function is called multiple times for a given update name the last handler will overwrite any previous calls. + * + * @param handler a function that will handle updates for non-registered update names, or `undefined` to unset the handler. + */ +export function setDefaultUpdateHandler(handler: DefaultUpdateHandler | undefined): void { + const activator = assertInWorkflowContext( + 'Workflow.setDefaultUpdateHandler(...) may only be used from a Workflow Execution.' + ); + if (typeof handler === 'function') { + activator.defaultUpdateHandler = handler; + activator.dispatchBufferedUpdates(); + } else if (handler == null) { + activator.defaultUpdateHandler = undefined; + } else { + throw new TypeError(`Expected handler to be either a function or 'undefined'. Got: '${typeof handler}'`); + } +} + /** * Updates this Workflow's Search Attributes by merging the provided `searchAttributes` with the existing Search * Attributes, `workflowInfo().searchAttributes`. From ea626f5164d411c7e597b42c8bf05119c2d59b1e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 6 Mar 2025 16:45:24 -0800 Subject: [PATCH 2/7] added tests, small fix, and some formatting --- packages/test/src/test-workflows.ts | 258 ++++++++++++++++++ packages/test/src/workflows/index.ts | 1 + .../test/src/workflows/updates-ordering.ts | 77 ++++++ packages/workflow/src/interfaces.ts | 2 +- packages/workflow/src/internals.ts | 17 +- 5 files changed, 348 insertions(+), 7 deletions(-) create mode 100644 packages/test/src/workflows/updates-ordering.ts diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 2eb10bbbd..7bfc60332 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -363,6 +363,24 @@ function makeSetPatchMarker(myPatchId: string, deprecated: boolean): coresdk.wor }; } +function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand { + return { + updateResponse: { + protocolInstanceId: id, + accepted: {}, + }, + }; +} + +function makeUpdateCompleteResponse(id: string, result: unknown): coresdk.workflow_commands.IWorkflowCommand { + return { + updateResponse: { + protocolInstanceId: id, + completed: defaultPayloadConverter.toPayload(result), + }, + }; +} + test('random', async (t) => { const { logs, workflowType } = t.context; { @@ -2528,3 +2546,243 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11 ); } }); + +test('Buffered updates are dispatched in the correct order - updatesOrdering', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate( + t, + makeActivation( + undefined, + makeInitializeWorkflowJob(workflowType), + { + doUpdate: { + id: '1', + protocolInstanceId: '1', + name: 'non-existant', + input: toPayloads(defaultPayloadConverter, 1), + }, + }, + { + doUpdate: { + id: '2', + protocolInstanceId: '2', + name: 'updateA', + input: toPayloads(defaultPayloadConverter, 2), + }, + }, + { + doUpdate: { + id: '3', + protocolInstanceId: '3', + name: 'updateA', + input: toPayloads(defaultPayloadConverter, 3), + }, + }, + { + doUpdate: { + id: '4', + protocolInstanceId: '4', + name: 'updateC', + input: toPayloads(defaultPayloadConverter, 4), + }, + }, + { + doUpdate: { + id: '5', + protocolInstanceId: '5', + name: 'updateB', + input: toPayloads(defaultPayloadConverter, 5), + }, + }, + { + doUpdate: { + id: '6', + protocolInstanceId: '6', + name: 'non-existant', + input: toPayloads(defaultPayloadConverter, 6), + }, + }, + { + doUpdate: { + id: '7', + protocolInstanceId: '7', + name: 'updateB', + input: toPayloads(defaultPayloadConverter, 7), + }, + } + ) + ); + + // The activation above: + // - initializes the workflow + // - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered) + // - enters the workflow code + // - workflow code sets handler for updateA + // - handler is registered for updateA + // - we attempt to dispatch buffered updates + // - buffered updates for handler A are *accepted* but not executed + // (executing an update is a promise/async, so it instead goes on the node event queue) + // - we continue/re-enter the workflow code + // - ...and do the same pattern for updateB, the default update handler, the updateC + // - once updates have been accepted, node processes the waiting events in its queue (the waiting updates) + // - these are processesed in FIFO order, so we get execution for updateA, then updateB, the default handler, then updateC + + // As such, the expected order of these updates is the order that the handlers were registered. + // Note that because the default handler was registered *before* updateC, all remaining buffered updates were dispatched + // to it, including the update for updateC. + + compareCompletion( + t, + completion, + makeSuccess( + [ + // FIFO accepted order + makeUpdateAcceptedResponse('2'), + makeUpdateAcceptedResponse('3'), + makeUpdateAcceptedResponse('5'), + makeUpdateAcceptedResponse('7'), + makeUpdateAcceptedResponse('1'), + makeUpdateAcceptedResponse('4'), + makeUpdateAcceptedResponse('6'), + // FIFO executed order + makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }), + makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }), + makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }), + makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }), + makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }), + // updateC handled by default handler. + makeUpdateCompleteResponse('4', { handler: 'default', updateName: 'updateC', args: [4] }), + makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }), + // No expected update response from updateC handler + makeCompleteWorkflowExecution(), + ] + // [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); + +test('Buffered updates are reentrant - updatesAreReentrant', async (t) => { + const { workflowType } = t.context; + { + const completion = await activate( + t, + makeActivation( + undefined, + makeInitializeWorkflowJob(workflowType), + { + doUpdate: { + id: '1', + protocolInstanceId: '1', + name: 'non-existant', + input: toPayloads(defaultPayloadConverter, 1), + }, + }, + { + doUpdate: { + id: '2', + protocolInstanceId: '2', + name: 'updateA', + input: toPayloads(defaultPayloadConverter, 2), + }, + }, + { + doUpdate: { + id: '3', + protocolInstanceId: '3', + name: 'updateA', + input: toPayloads(defaultPayloadConverter, 3), + }, + }, + { + doUpdate: { + id: '4', + protocolInstanceId: '4', + name: 'updateC', + input: toPayloads(defaultPayloadConverter, 4), + }, + }, + { + doUpdate: { + id: '5', + protocolInstanceId: '5', + name: 'updateB', + input: toPayloads(defaultPayloadConverter, 5), + }, + }, + { + doUpdate: { + id: '6', + protocolInstanceId: '6', + name: 'non-existant', + input: toPayloads(defaultPayloadConverter, 6), + }, + }, + { + doUpdate: { + id: '7', + protocolInstanceId: '7', + name: 'updateB', + input: toPayloads(defaultPayloadConverter, 7), + }, + }, + { + doUpdate: { + id: '8', + protocolInstanceId: '8', + name: 'updateC', + input: toPayloads(defaultPayloadConverter, 8), + }, + } + ) + ); + + // The activation above: + // - initializes the workflow + // - buffers all its updates (we attempt update jobs first, but since there are no handlers, they get buffered) + // - enters the workflow code + // - workflow code sets handler for updateA + // - handler is registered for updateA + // - we attempt to dispatch buffered updates + // - buffered updates for handler A are *accepted* but not executed + // (executing an update is a promise/async, so it instead goes on the node event queue) + // - however, there is no more workflow code, node dequues event queue and we immediately run the update handler + // (we begin executing the update which...) + // - deletes the current handler and registers the next one (updateB) + // - this pattern repeats (updateA -> updateB -> updateC -> default) until there are no more updates to handle + // - at this point, all updates have been accepted and are executing + // - due to the call order in the workflow, the completion order of the updates follows the call stack, LIFO + + // This workflow is interesting in that updates are accepted FIFO, but executed LIFO + + compareCompletion( + t, + completion, + makeSuccess( + [ + // FIFO accepted order + makeUpdateAcceptedResponse('2'), + makeUpdateAcceptedResponse('5'), + makeUpdateAcceptedResponse('4'), + makeUpdateAcceptedResponse('1'), + makeUpdateAcceptedResponse('3'), + makeUpdateAcceptedResponse('7'), + makeUpdateAcceptedResponse('8'), + makeUpdateAcceptedResponse('6'), + // LIFO executed order + makeUpdateCompleteResponse('6', { handler: 'default', updateName: 'non-existant', args: [6] }), + makeUpdateCompleteResponse('8', { handler: 'updateC', args: [8] }), + makeUpdateCompleteResponse('7', { handler: 'updateB', args: [7] }), + makeUpdateCompleteResponse('3', { handler: 'updateA', args: [3] }), + makeUpdateCompleteResponse('1', { handler: 'default', updateName: 'non-existant', args: [1] }), + makeUpdateCompleteResponse('4', { handler: 'updateC', args: [4] }), + makeUpdateCompleteResponse('5', { handler: 'updateB', args: [5] }), + makeUpdateCompleteResponse('2', { handler: 'updateA', args: [2] }), + makeCompleteWorkflowExecution(), + ] + // [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } +}); diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index cc01d985f..3957d0680 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -89,3 +89,4 @@ export * from './upsert-and-read-search-attributes'; export * from './wait-on-user'; export * from './workflow-cancellation-scenarios'; export * from './upsert-and-read-memo'; +export * from './updates-ordering'; diff --git a/packages/test/src/workflows/updates-ordering.ts b/packages/test/src/workflows/updates-ordering.ts new file mode 100644 index 000000000..cb566510d --- /dev/null +++ b/packages/test/src/workflows/updates-ordering.ts @@ -0,0 +1,77 @@ +import { defineUpdate, setDefaultUpdateHandler, setHandler } from '@temporalio/workflow'; + +const updateA = defineUpdate('updateA'); +const updateB = defineUpdate('updateB'); +const updateC = defineUpdate('updateC'); + +interface ProcessedUpdate { + handler: string; + updateName?: string; + args: unknown[]; +} + +/* + There's a surprising amount going on with the workflow below. Let's simplify it to just updateA and updateB + (no updateC or the default) and walk through it. + + 1. setHandler for updateA + - this is all synchronous code until we run `UpdateScope.updateWithInfo(updateId, name, doUpdateImpl)`, + which calls `doUpdateImpl` which is promise/async, so... + 2. queue doUpdateImpl for A on node event queue: [doUpdateImplA] + 3. continue running the workflow code (currently running code, we aren't awaiting the promise) + 4. setHandler for updateB + - same deal as A + 5. queue doUpdateImpl for B on node event queue: [doUpdateImplA, doUpdateImplB] + 6. finished workflow code, go through the event queue + 7. doUpdateImplA + - synchronous until we get to `execute`, which means we've accepted the update, command ordering [acceptA] + 8. `execute` returns a promise, add it to the node event queue: [doUpdateImplB, executeA] + 9. doUpdateImplB + - same deal as A, command ordering [acceptA, acceptB] + - `execute` returns promise, node event queue [executeA, executeB] + 10. execute update A, node event queue [executeB], command ordering [acceptA, acceptB, executeA] + 11. execute update B, node event queue [] (empty), command ordering [acceptA, acceptB, executeA, executeB] + + The only additional complexity with the workflow below is that once the default handler is registered, buffered updates for C will be + dispatched to the default handler. So in this scenario: C1, C2 -> default registered -> C registered, both C1 and C2 will be dispatched + to the default handler. +*/ +export async function updatesOrdering(): Promise { + setHandler(updateA, (...args: any[]) => { + return { handler: 'updateA', args }; + }); + setHandler(updateB, (...args: any[]) => { + return { handler: 'updateB', args }; + }); + setDefaultUpdateHandler((updateName, ...args: any[]) => { + return { handler: 'default', updateName, args }; + }); + setHandler(updateC, (...args: any[]) => { + return { handler: 'updateC', args }; + }); +} + +export async function updatesAreReentrant(): Promise { + function handlerA(...args: any[]) { + setHandler(updateA, undefined); + setHandler(updateB, handlerB); + return { handler: 'updateA', args }; + } + function handlerB(...args: any[]) { + setHandler(updateB, undefined); + setHandler(updateC, handlerC); + return { handler: 'updateB', args }; + } + function handlerC(...args: any[]) { + setHandler(updateC, undefined); + setDefaultUpdateHandler(defaultHandler); + return { handler: 'updateC', args }; + } + function defaultHandler(updateName: string, ...args: any[]) { + setDefaultUpdateHandler(undefined); + setHandler(updateA, handlerA); + return { handler: 'default', updateName, args }; + } + + setHandler(updateA, handlerA); +} diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 239dfb892..4e131bec5 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -541,7 +541,7 @@ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => v /** * A handler function accepting update calls for non-registered update names. */ -export type DefaultUpdateHandler = (...args: any[]) => Promise | any; +export type DefaultUpdateHandler = (updateName: string, ...args: any[]) => Promise | any; /** * A validation function capable of accepting the arguments for a given UpdateDefinition. diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 3c07565b5..ed93d8cfa 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -48,7 +48,6 @@ import { untrackPromise } from './stack-helpers'; import pkg from './pkg'; import { SdkFlag, assertValidFlag } from './flags'; import { executeWithLifecycleLogging, log } from './logs'; -import { DefaultUpdateHandler } from './interfaces'; const StartChildWorkflowExecutionFailedCause = { WORKFLOW_ALREADY_EXISTS: 'WORKFLOW_ALREADY_EXISTS', @@ -676,11 +675,15 @@ export class Activator implements ActivationHandler { this.bufferedUpdates.push(activation); return; } - + const entry = this.updateHandlers.get(name) ?? { - handler: this.defaultUpdateHandler, - // Default to a warning policy. - unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON, + // Logically, this must be defined as we got passed the conditional above + // pushing to the buffer. But Typescript doesn't know that so we use a + // non-null assertion (!). + handler: (...args: any[]) => this.defaultUpdateHandler!(name, ...args), + validator: undefined, + // Default to a warning policy. + unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON, }; const makeInput = (): UpdateInput => ({ @@ -773,7 +776,9 @@ export class Activator implements ActivationHandler { // We have a default update handler, so all updates are dispatchable. if (this.defaultUpdateHandler) { const update = bufferedUpdates.shift(); - this.doUpdate(update); + // Logically, this must be defined as we're in the loop. + // But Typescript doesn't know that so we use a non-null assertion (!). + this.doUpdate(update!); } else { const foundIndex = bufferedUpdates.findIndex((update) => this.updateHandlers.has(update.name as string)); if (foundIndex === -1) { From e5be9d090c17a7abb68a4f3083fdbcca3fc4c864 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 6 Mar 2025 17:01:20 -0800 Subject: [PATCH 3/7] small helper to make update activation jobs, makes the diff much smaller after formatting --- packages/test/src/test-workflows.ts | 187 ++++++---------------------- 1 file changed, 35 insertions(+), 152 deletions(-) diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 7bfc60332..6e667dc18 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -363,6 +363,22 @@ function makeSetPatchMarker(myPatchId: string, deprecated: boolean): coresdk.wor }; } +function makeUpdateActivationJob( + id: string, + protocolInstanceId: string, + name: string, + input: unknown +): coresdk.workflow_activation.IWorkflowActivationJob { + return { + doUpdate: { + id, + protocolInstanceId, + name, + input: toPayloads(defaultPayloadConverter, input), + }, + }; +} + function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand { return { updateResponse: { @@ -2420,23 +2436,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - pre- ...makeActivation( undefined, makeSignalWorkflowJob('aaSignal', ['signal1']), - { - doUpdate: { - id: 'first', - name: 'aaUpdate', - protocolInstanceId: '1', - input: toPayloads(defaultPayloadConverter, ['update1']), - }, - }, + makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), makeSignalWorkflowJob('aaSignal', ['signal2']), - { - doUpdate: { - id: 'second', - name: 'aaUpdate', - protocolInstanceId: '2', - input: toPayloads(defaultPayloadConverter, ['update2']), - }, - }, + makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), makeFireTimerJob(1), makeResolveActivityJob(1, { completed: {} }) ), @@ -2499,23 +2501,9 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11 ...makeActivation( undefined, makeSignalWorkflowJob('aaSignal', ['signal1']), - { - doUpdate: { - id: 'first', - name: 'aaUpdate', - protocolInstanceId: '1', - input: toPayloads(defaultPayloadConverter, ['update1']), - }, - }, + makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), makeSignalWorkflowJob('aaSignal', ['signal2']), - { - doUpdate: { - id: 'second', - name: 'aaUpdate', - protocolInstanceId: '2', - input: toPayloads(defaultPayloadConverter, ['update2']), - }, - }, + makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), makeFireTimerJob(1), makeResolveActivityJob(1, { completed: {} }) ), @@ -2555,62 +2543,13 @@ test('Buffered updates are dispatched in the correct order - updatesOrdering', a makeActivation( undefined, makeInitializeWorkflowJob(workflowType), - { - doUpdate: { - id: '1', - protocolInstanceId: '1', - name: 'non-existant', - input: toPayloads(defaultPayloadConverter, 1), - }, - }, - { - doUpdate: { - id: '2', - protocolInstanceId: '2', - name: 'updateA', - input: toPayloads(defaultPayloadConverter, 2), - }, - }, - { - doUpdate: { - id: '3', - protocolInstanceId: '3', - name: 'updateA', - input: toPayloads(defaultPayloadConverter, 3), - }, - }, - { - doUpdate: { - id: '4', - protocolInstanceId: '4', - name: 'updateC', - input: toPayloads(defaultPayloadConverter, 4), - }, - }, - { - doUpdate: { - id: '5', - protocolInstanceId: '5', - name: 'updateB', - input: toPayloads(defaultPayloadConverter, 5), - }, - }, - { - doUpdate: { - id: '6', - protocolInstanceId: '6', - name: 'non-existant', - input: toPayloads(defaultPayloadConverter, 6), - }, - }, - { - doUpdate: { - id: '7', - protocolInstanceId: '7', - name: 'updateB', - input: toPayloads(defaultPayloadConverter, 7), - }, - } + makeUpdateActivationJob('1', '1', 'non-existant', 1), + makeUpdateActivationJob('2', '2', 'updateA', 2), + makeUpdateActivationJob('3', '3', 'updateA', 3), + makeUpdateActivationJob('4', '4', 'updateC', 4), + makeUpdateActivationJob('5', '5', 'updateB', 5), + makeUpdateActivationJob('6', '6', 'non-existant', 6), + makeUpdateActivationJob('7', '7', 'updateB', 7) ) ); @@ -2671,70 +2610,14 @@ test('Buffered updates are reentrant - updatesAreReentrant', async (t) => { makeActivation( undefined, makeInitializeWorkflowJob(workflowType), - { - doUpdate: { - id: '1', - protocolInstanceId: '1', - name: 'non-existant', - input: toPayloads(defaultPayloadConverter, 1), - }, - }, - { - doUpdate: { - id: '2', - protocolInstanceId: '2', - name: 'updateA', - input: toPayloads(defaultPayloadConverter, 2), - }, - }, - { - doUpdate: { - id: '3', - protocolInstanceId: '3', - name: 'updateA', - input: toPayloads(defaultPayloadConverter, 3), - }, - }, - { - doUpdate: { - id: '4', - protocolInstanceId: '4', - name: 'updateC', - input: toPayloads(defaultPayloadConverter, 4), - }, - }, - { - doUpdate: { - id: '5', - protocolInstanceId: '5', - name: 'updateB', - input: toPayloads(defaultPayloadConverter, 5), - }, - }, - { - doUpdate: { - id: '6', - protocolInstanceId: '6', - name: 'non-existant', - input: toPayloads(defaultPayloadConverter, 6), - }, - }, - { - doUpdate: { - id: '7', - protocolInstanceId: '7', - name: 'updateB', - input: toPayloads(defaultPayloadConverter, 7), - }, - }, - { - doUpdate: { - id: '8', - protocolInstanceId: '8', - name: 'updateC', - input: toPayloads(defaultPayloadConverter, 8), - }, - } + makeUpdateActivationJob('1', '1', 'non-existant', 1), + makeUpdateActivationJob('2', '2', 'updateA', 2), + makeUpdateActivationJob('3', '3', 'updateA', 3), + makeUpdateActivationJob('4', '4', 'updateC', 4), + makeUpdateActivationJob('5', '5', 'updateB', 5), + makeUpdateActivationJob('6', '6', 'non-existant', 6), + makeUpdateActivationJob('7', '7', 'updateB', 7), + makeUpdateActivationJob('8', '8', 'updateC', 8) ) ); From 7e9971a7fc0841b53f1ca617af8db83154f87b6f Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 7 Mar 2025 12:01:05 -0800 Subject: [PATCH 4/7] small test helper change --- packages/test/src/test-workflows.ts | 34 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 6e667dc18..94a29c961 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -367,14 +367,14 @@ function makeUpdateActivationJob( id: string, protocolInstanceId: string, name: string, - input: unknown + input: unknown[] ): coresdk.workflow_activation.IWorkflowActivationJob { return { doUpdate: { id, protocolInstanceId, name, - input: toPayloads(defaultPayloadConverter, input), + input: toPayloads(defaultPayloadConverter, ...input), }, }; } @@ -2543,13 +2543,13 @@ test('Buffered updates are dispatched in the correct order - updatesOrdering', a makeActivation( undefined, makeInitializeWorkflowJob(workflowType), - makeUpdateActivationJob('1', '1', 'non-existant', 1), - makeUpdateActivationJob('2', '2', 'updateA', 2), - makeUpdateActivationJob('3', '3', 'updateA', 3), - makeUpdateActivationJob('4', '4', 'updateC', 4), - makeUpdateActivationJob('5', '5', 'updateB', 5), - makeUpdateActivationJob('6', '6', 'non-existant', 6), - makeUpdateActivationJob('7', '7', 'updateB', 7) + makeUpdateActivationJob('1', '1', 'non-existant', [1]), + makeUpdateActivationJob('2', '2', 'updateA', [2]), + makeUpdateActivationJob('3', '3', 'updateA', [3]), + makeUpdateActivationJob('4', '4', 'updateC', [4]), + makeUpdateActivationJob('5', '5', 'updateB', [5]), + makeUpdateActivationJob('6', '6', 'non-existant', [6]), + makeUpdateActivationJob('7', '7', 'updateB', [7]) ) ); @@ -2610,14 +2610,14 @@ test('Buffered updates are reentrant - updatesAreReentrant', async (t) => { makeActivation( undefined, makeInitializeWorkflowJob(workflowType), - makeUpdateActivationJob('1', '1', 'non-existant', 1), - makeUpdateActivationJob('2', '2', 'updateA', 2), - makeUpdateActivationJob('3', '3', 'updateA', 3), - makeUpdateActivationJob('4', '4', 'updateC', 4), - makeUpdateActivationJob('5', '5', 'updateB', 5), - makeUpdateActivationJob('6', '6', 'non-existant', 6), - makeUpdateActivationJob('7', '7', 'updateB', 7), - makeUpdateActivationJob('8', '8', 'updateC', 8) + makeUpdateActivationJob('1', '1', 'non-existant', [1]), + makeUpdateActivationJob('2', '2', 'updateA', [2]), + makeUpdateActivationJob('3', '3', 'updateA', [3]), + makeUpdateActivationJob('4', '4', 'updateC', [4]), + makeUpdateActivationJob('5', '5', 'updateB', [5]), + makeUpdateActivationJob('6', '6', 'non-existant', [6]), + makeUpdateActivationJob('7', '7', 'updateB', [7]), + makeUpdateActivationJob('8', '8', 'updateC', [8]) ) ); From 15b15038c7fd299ab7e32c6404d3d36fa005f333 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 10 Mar 2025 09:53:18 -0700 Subject: [PATCH 5/7] nits --- packages/workflow/src/interfaces.ts | 2 +- packages/workflow/src/internals.ts | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 4e131bec5..632bb54c9 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -541,7 +541,7 @@ export type DefaultSignalHandler = (signalName: string, ...args: unknown[]) => v /** * A handler function accepting update calls for non-registered update names. */ -export type DefaultUpdateHandler = (updateName: string, ...args: any[]) => Promise | any; +export type DefaultUpdateHandler = (updateName: string, ...args: unknown[]) => Promise | unknown; /** * A validation function capable of accepting the arguments for a given UpdateDefinition. diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index ed93d8cfa..261a1f9ec 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -671,21 +671,24 @@ export class Activator implements ActivationHandler { if (!protocolInstanceId) { throw new TypeError('Missing activation update protocolInstanceId'); } - if (!this.updateHandlers.get(name) && !this.defaultUpdateHandler) { + + const entry = + this.updateHandlers.get(name) ?? + (this.defaultUpdateHandler + ? { + handler: this.defaultUpdateHandler.bind(name), + validator: undefined, + // Default to a warning policy. + unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON, + } + : null); + + // If we don't have an entry from either source, buffer and return + if (entry === null) { this.bufferedUpdates.push(activation); return; } - const entry = this.updateHandlers.get(name) ?? { - // Logically, this must be defined as we got passed the conditional above - // pushing to the buffer. But Typescript doesn't know that so we use a - // non-null assertion (!). - handler: (...args: any[]) => this.defaultUpdateHandler!(name, ...args), - validator: undefined, - // Default to a warning policy. - unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON, - }; - const makeInput = (): UpdateInput => ({ updateId, args: arrayFromPayloads(this.payloadConverter, activation.input), From 0f8f50844fcf92ff728707a0a0b582ae09d2be90 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Thu, 13 Mar 2025 15:37:06 -0400 Subject: [PATCH 6/7] test --- packages/test/src/test-workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 94a29c961..b93cb2c53 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -378,7 +378,7 @@ function makeUpdateActivationJob( }, }; } - +ddd function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand { return { updateResponse: { From c4a9487c78df42685327c46b81cdbea307140b96 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Thu, 13 Mar 2025 15:40:39 -0400 Subject: [PATCH 7/7] undo --- packages/test/src/test-workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index b93cb2c53..94a29c961 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -378,7 +378,7 @@ function makeUpdateActivationJob( }, }; } -ddd + function makeUpdateAcceptedResponse(id: string): coresdk.workflow_commands.IWorkflowCommand { return { updateResponse: {