From cff2f9763c42569cafca41ab6ddfb03f55014ffc Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 11 Apr 2025 15:11:26 -0700 Subject: [PATCH 01/14] Update core --- packages/core-bridge/sdk-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 93471ac6d..e18982ec7 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 93471ac6d8bbf62839148a1bf97ef6dfd49aead0 +Subproject commit e18982ec72be62e357a5ea418b1670c8b2fee55f From 156f609cda35d6df006a6c51b563c6b71f64dcaa Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 11 Apr 2025 16:17:28 -0700 Subject: [PATCH 02/14] Upgrade core & make compile --- packages/common/src/index.ts | 1 + packages/common/src/worker-deployments.ts | 11 ++++++++ packages/worker/src/utils.ts | 14 ++++++++++ packages/worker/src/worker-options.ts | 32 ++++++++++++++++++++++- packages/worker/src/worker.ts | 5 ++-- packages/worker/src/workflow/vm-shared.ts | 4 ++- packages/workflow/src/interfaces.ts | 12 +++++++++ 7 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 packages/common/src/worker-deployments.ts diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 1c45a5a90..04e963079 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -22,6 +22,7 @@ export * from './logger'; export * from './priority'; export * from './retry-policy'; export type { Timestamp, Duration, StringValue } from './time'; +export * from './worker-deployments'; export * from './workflow-handle'; export * from './workflow-options'; export * from './versioning-intent'; diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts new file mode 100644 index 000000000..8023be01f --- /dev/null +++ b/packages/common/src/worker-deployments.ts @@ -0,0 +1,11 @@ +/** + * Represents the version of a specific worker deployment. + * + * @experimental Worker deployments are experimental + */ +export interface WorkerDeploymentVersion { + buildId: string; + deploymentName: string; +} + +export type VersioningBehavior = 'pinned' | 'auto-upgrade'; diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index c351bdf43..467be6438 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -1,3 +1,4 @@ +import { WorkerDeploymentVersion } from '@temporalio/common'; import type { coresdk, temporal } from '@temporalio/proto'; import { IllegalStateError, ParentWorkflowInfo, RootWorkflowInfo } from '@temporalio/workflow'; @@ -29,6 +30,19 @@ export function convertToParentWorkflowType( }; } +export function convertDeploymentVersion( + v: coresdk.common.IWorkerDeploymentVersion | null | undefined +): WorkerDeploymentVersion | undefined { + if (!v) { + return undefined; + } + + return { + buildId: v.buildId ?? '', + deploymentName: v.deploymentName ?? '', + }; +} + export function convertToRootWorkflowType( root: temporal.api.common.v1.IWorkflowExecution | null | undefined ): RootWorkflowInfo | undefined { diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 44c8e1941..91e3348c7 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1,7 +1,13 @@ import * as os from 'node:os'; import * as v8 from 'node:v8'; import type { Configuration as WebpackConfiguration } from 'webpack'; -import { ActivityFunction, DataConverter, LoadedDataConverter } from '@temporalio/common'; +import { + ActivityFunction, + DataConverter, + LoadedDataConverter, + VersioningBehavior, + WorkerDeploymentVersion, +} from '@temporalio/common'; import { Duration, msOptionalToNumber, msToNumber } from '@temporalio/common/lib/time'; import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow'; import { LoggerSinks } from '@temporalio/workflow'; @@ -61,6 +67,7 @@ export interface WorkerOptions { * @default `@temporalio/worker` package name and version + checksum of workflow bundle's code * * @experimental The Worker Versioning API is still being designed. Major changes are expected. + * @deprecated Use {@link deploymentVersion} instead. */ buildId?: string; @@ -72,9 +79,32 @@ export interface WorkerOptions { * For more information, see https://docs.temporal.io/workers#worker-versioning * * @experimental The Worker Versioning API is still being designed. Major changes are expected. + * @deprecated Use {@link deploymentVersion} instead. */ useVersioning?: boolean; + /** + * Deployment options for the worker. Exclusive with `build_id` and `use_worker_versioning`. + + * @experimental Deployment based versioning is still experimental. + */ + workerDeploymentOptions?: { + /** + * The deployment version of the worker. + */ + version: WorkerDeploymentVersion; + + /** + * Whether to use deployment-based worker versioning. + */ + useWorkerVersioning: boolean; + + /** + * If specified, the default versioning behavior to use for all workflows on this worker. + */ + defaultVersioningBehavior?: VersioningBehavior; + }; + /** * The namespace this worker will connect to * diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 6a86c3d18..f10593bb8 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -69,7 +69,7 @@ import { } from './replay'; import { History, Runtime } from './runtime'; import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; -import { byteArrayToBuffer, convertToParentWorkflowType, convertToRootWorkflowType } from './utils'; +import { byteArrayToBuffer, convertDeploymentVersion, convertToParentWorkflowType, convertToRootWorkflowType } from './utils'; import { CompiledWorkerOptions, CompiledWorkerOptionsWithBuildId, @@ -1292,7 +1292,8 @@ export class Worker { // A zero value means that it was not set by the server historySize: activation.historySizeBytes.toNumber(), continueAsNewSuggested: activation.continueAsNewSuggested, - currentBuildId: activation.buildIdForCurrentTask, + currentBuildId: activation.deploymentVersionForCurrentTask?.buildId ?? '', + currentDeploymentVersion: convertDeploymentVersion(activation.deploymentVersionForCurrentTask), unsafe: { now: () => Date.now(), // re-set in initRuntime isReplaying: activation.isReplaying, diff --git a/packages/worker/src/workflow/vm-shared.ts b/packages/worker/src/workflow/vm-shared.ts index 7418b2655..165f3ce6d 100644 --- a/packages/worker/src/workflow/vm-shared.ts +++ b/packages/worker/src/workflow/vm-shared.ts @@ -16,6 +16,7 @@ import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { UnhandledRejectionError } from '../errors'; import { Workflow } from './interface'; import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input'; +import { convertDeploymentVersion } from '../utils'; // Best effort to catch unhandled rejections from workflow code. // We crash the thread if we cannot find the culprit. @@ -348,7 +349,8 @@ export abstract class BaseVMWorkflow implements Workflow { // historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown historySize: activation.historySizeBytes?.toNumber() ?? 0, continueAsNewSuggested: activation.continueAsNewSuggested ?? false, - currentBuildId: activation.buildIdForCurrentTask ?? undefined, + currentBuildId: activation.deploymentVersionForCurrentTask?.buildId ?? '', + currentDeploymentVersion: convertDeploymentVersion(activation.deploymentVersionForCurrentTask), unsafe: { ...info.unsafe, isReplaying: activation.isReplaying ?? false, diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 4847a2c37..6170f817f 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -12,6 +12,7 @@ import { TypedSearchAttributes, SearchAttributePair, Priority, + WorkerDeploymentVersion, } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; @@ -195,9 +196,20 @@ export interface WorkflowInfo { * task was completed by a worker without a Build ID. If this worker is the one executing this * task for the first time and has a Build ID set, then its ID will be used. This value may change * over the lifetime of the workflow run, but is deterministic and safe to use for branching. + * + * @deprecated Use `currentDeploymentVersion` instead */ readonly currentBuildId?: string; + /** + * The Deployment Version of the worker which executed the current Workflow Task. May be undefined + * if the task was completed by a worker without a Deployment Version. If this worker is the one + * executing this task for the first time and has a Deployment Version set, then its ID will be + * used. This value may change over the lifetime of the workflow run, but is deterministic and + * safe to use for branching. + */ + readonly currentDeploymentVersion?: WorkerDeploymentVersion; + readonly unsafe: UnsafeWorkflowInfo; /** From 48b37b9c01a8cae7337e43325fbd1cdca8cff59f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 14 Apr 2025 14:01:00 -0700 Subject: [PATCH 03/14] First test / options passthrough --- packages/test/src/helpers-integration.ts | 6 + .../src/test-worker-deployment-versioning.ts | 172 ++++++++++++++++++ .../src/workflows/deployment-versioning.ts | 16 ++ packages/worker/src/worker-options.ts | 2 + packages/workflow/src/interfaces.ts | 18 ++ packages/workflow/src/workflow.ts | 18 ++ 6 files changed, 232 insertions(+) create mode 100644 packages/test/src/test-worker-deployment-versioning.ts create mode 100644 packages/test/src/workflows/deployment-versioning.ts diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 7abcbacd7..78372d6c5 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -41,6 +41,7 @@ const defaultDynamicConfigOptions = [ 'frontend.workerVersioningDataAPIs=true', 'frontend.workerVersioningWorkflowAPIs=true', 'system.enableActivityEagerExecution=true', + 'system.enableDeploymentVersions=true', 'system.enableEagerWorkflowStart=true', 'system.forceSearchAttributesCacheRefreshOnRead=true', 'worker.buildIdScavengerEnabled=true', @@ -100,6 +101,11 @@ export async function createLocalTestEnvironment( ...(opts || {}), // Use provided options or default to an empty object server: { searchAttributes: Object.values(defaultSAKeys), + // TODO: Remove after next CLI release + executable: { + type: 'cached-download', + version: 'v1.3.1-priority.0', + }, ...(opts?.server || {}), // Use provided server options or default to an empty object extraArgs: [ ...defaultDynamicConfigOptions.flatMap((opt) => ['--dynamic-config-value', opt]), diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts new file mode 100644 index 000000000..4584b8827 --- /dev/null +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -0,0 +1,172 @@ +/** + * Tests worker-deployment-based versioning + * + * @module + */ +import assert from 'assert'; +import { randomUUID } from 'crypto'; +import asyncRetry from 'async-retry'; +import { Client } from '@temporalio/client'; +import { Worker } from './helpers'; +import * as activities from './activities'; +import { WorkerDeploymentVersion } from '@temporalio/common'; +import { makeTestFunction } from './helpers-integration'; + +const test = makeTestFunction({ workflowsPath: __filename }); + +test('Worker deployment based versioning', async (t) => { + const taskQueue = 'worker-deployment-based-versioning-' + randomUUID(); + const deploymentName = 'deployment-' + randomUUID(); + const client = t.context.env.client; + + const w1DeploymentVersion = { + buildId: '1.0', + deploymentName: deploymentName, + }; + const w2DeploymentVersion = { + buildId: '2.0', + deploymentName: deploymentName, + }; + const w3DeploymentVersion = { + buildId: '3.0', + deploymentName: deploymentName, + }; + + const worker1 = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: w1DeploymentVersion, + }, + }); + const worker1Promise = worker1.run(); + worker1Promise.catch((err) => { + t.fail('Worker 1.0 run error: ' + JSON.stringify(err)); + }); + + const worker2 = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: w2DeploymentVersion, + }, + }); + const worker2Promise = worker2.run(); + worker2Promise.catch((err) => { + t.fail('Worker 2.0 run error: ' + JSON.stringify(err)); + }); + + const worker3 = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: w3DeploymentVersion, + }, + }); + const worker3Promise = worker3.run(); + worker3Promise.catch((err) => { + t.fail('Worker 3.0 run error: ' + JSON.stringify(err)); + }); + + // Wait for worker 1 to be visible and set as current version + const describeResp1 = await waitUntilWorkerDeploymentVisible(client, w1DeploymentVersion); + await setCurrentDeploymentVersion(client, describeResp1.conflictToken, w1DeploymentVersion); + + // Start workflow 1 which will use the 1.0 worker on auto-upgrade + const wf1 = await client.workflow.start('autoUpgradeWorkflow', { + taskQueue, + workflowId: 'basic-versioning-v1-' + randomUUID(), + }); + const state1 = await wf1.query('state'); + assert.equal(state1, 'v1'); + + // Wait for worker 2 to be visible and set as current version + const describeResp2 = await waitUntilWorkerDeploymentVisible(client, w2DeploymentVersion); + await setCurrentDeploymentVersion(client, describeResp2.conflictToken, w2DeploymentVersion); + + // Start workflow 2 which will use the 2.0 worker pinned + const wf2 = await client.workflow.start('pinnedWorkflow', { + taskQueue, + workflowId: 'basic-versioning-v2-' + randomUUID(), + }); + const state2 = await wf2.query('state'); + assert.equal(state2, 'v2'); + + // Wait for worker 3 to be visible and set as current version + const describeResp3 = await waitUntilWorkerDeploymentVisible(client, w3DeploymentVersion); + await setCurrentDeploymentVersion(client, describeResp3.conflictToken, w3DeploymentVersion); + + // Start workflow 3 which will use the 3.0 worker on auto-upgrade + const wf3 = await client.workflow.start('autoUpgradeWorkflow', { + taskQueue, + workflowId: 'basic-versioning-v3-' + randomUUID(), + }); + const state3 = await wf3.query('state'); + assert.equal(state3, 'v3'); + + // Signal all workflows to finish + await wf1.signal('doFinish'); + await wf2.signal('doFinish'); + await wf3.signal('doFinish'); + + const res1 = await wf1.result(); + const res2 = await wf2.result(); + const res3 = await wf3.result(); + + assert.equal(res1, 'version-v3'); + assert.equal(res2, 'version-v2'); + assert.equal(res3, 'version-v3'); + + worker1.shutdown(); + worker2.shutdown(); + worker3.shutdown(); + await worker1Promise; + await worker2Promise; + await worker3Promise; + t.pass(); +}); + +async function waitUntilWorkerDeploymentVisible(client: Client, version: WorkerDeploymentVersion) { + return await asyncRetry( + async () => { + try { + const resp = await client.workflowService.describeWorkerDeployment({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + }); + + const isVersionVisible = resp.workerDeploymentInfo!.versionSummaries!.some( + (vs) => vs.version === version.buildId + ); + + if (!isVersionVisible) { + throw new Error('Version not visible yet'); + } + + return resp; + } catch (error) { + throw error; + } + }, + { maxTimeout: 1000, retries: 10 } + ); +} + +async function setCurrentDeploymentVersion( + client: Client, + conflictToken: Uint8Array, + version: WorkerDeploymentVersion +) { + return await client.workflowService.setWorkerDeploymentCurrentVersion({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + version: version.buildId, + conflictToken, + }); +} diff --git a/packages/test/src/workflows/deployment-versioning.ts b/packages/test/src/workflows/deployment-versioning.ts new file mode 100644 index 000000000..5c9312c66 --- /dev/null +++ b/packages/test/src/workflows/deployment-versioning.ts @@ -0,0 +1,16 @@ +import { CancelledFailure, defineQuery, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { unblockSignal } from './definitions'; + +export const versionQuery = defineQuery('version'); + +export const deploymentVersioningV1AutoUpgrade = defineWorkflowWithOptions( + { versioningBehavior: 'auto-upgrade' }, + _deploymentVersioningV1AutoUpgrade +); +async function _deploymentVersioningV1AutoUpgrade(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v1'); + await condition(() => doFinish); + return 'version-v1'; +} diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 91e3348c7..4a1219dd3 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -101,6 +101,8 @@ export interface WorkerOptions { /** * If specified, the default versioning behavior to use for all workflows on this worker. + * If not specified, and `useWorkerVersioning` is true, workflows that do not specify a + * versioning behavior via {@link TODO} will cause an error to be thrown on startup. */ defaultVersioningBehavior?: VersioningBehavior; }; diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 6170f817f..63b82efaf 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -13,6 +13,7 @@ import { SearchAttributePair, Priority, WorkerDeploymentVersion, + VersioningBehavior, } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; @@ -636,3 +637,20 @@ export interface ActivationCompletion { commands: coresdk.workflow_commands.IWorkflowCommand[]; usedInternalFlags: number[]; } + +/** + * Options that can be used when defining a workflow via {@link defineWorkflowWithOptions}. + */ +export interface WorkflowDefinitionOptions { + versioningBehavior?: VersioningBehavior; +} + +type AsyncFunction = (...args: Args) => Promise; + +/** + * A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}. + */ +export interface WorkflowFunctionWithOptions extends AsyncFunction { + __temporal_is_workflow_function_with_options: true; + options: WorkflowDefinitionOptions; +} diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 54b3069a0..3de31f7c4 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -59,6 +59,8 @@ import { encodeParentClosePolicy, DefaultUpdateHandler, DefaultQueryHandler, + WorkflowDefinitionOptions, + WorkflowFunctionWithOptions, } from './interfaces'; import { LocalActivityDoBackoff } from './errors'; import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes'; @@ -1614,6 +1616,22 @@ export function allHandlersFinished(): boolean { return activator.inProgressSignals.size === 0 && activator.inProgressUpdates.size === 0; } +/** +* Can be used to define workflow functions with certain options specified at definition time. + +* @param options Options for the workflow defintion. +* @param fn The workflow function. +* @returns The same passed in workflow function, with the specified options applied. You can export +* this function to make it available as a workflow function. +*/ +export function defineWorkflowWithOptions( + options: WorkflowDefinitionOptions, + fn: (...args: A) => Promise +): WorkflowFunctionWithOptions { + const wrappedFn = Object.assign(fn, { options, __temporal_is_workflow_function_with_options: true as const }); + return wrappedFn; +} + export const stackTraceQuery = defineQuery('__stack_trace'); export const enhancedStackTraceQuery = defineQuery('__enhanced_stack_trace'); export const workflowMetadataQuery = defineQuery('__temporal_workflow_metadata'); From 5f280e4d891bc2f8cf3eac1b17e879c7e9dc2d37 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 14 Apr 2025 14:43:39 -0700 Subject: [PATCH 04/14] Need a way to run against extant server --- packages/test/src/helpers-integration.ts | 9 ++++++++- packages/test/src/helpers.ts | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 78372d6c5..b7ddfd740 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -140,7 +140,14 @@ export function makeTestFunction(opts: { return makeConfigurableEnvironmentTestFn({ recordedLogs: opts.recordedLogs, createTestContext: async (_t: ExecutionContext): Promise => { - const env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts); + let env: TestWorkflowEnvironment; + if (process.env.TEMPORAL_SERVICE_ADDRESS) { + env = await TestWorkflowEnvironment.createExistingServer({ + address: process.env.TEMPORAL_SERVICE_ADDRESS, + }); + } else { + env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts); + } return { workflowBundle: await createTestWorkflowBundle({ workflowsPath: opts.workflowsPath, diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index 0190ecd54..d15bbea91 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -11,6 +11,7 @@ import { Payload, PayloadCodec } from '@temporalio/common'; import { historyToJSON } from '@temporalio/common/lib/proto-utils'; import * as iface from '@temporalio/proto'; import { + ExistingServerTestWorkflowEnvironmentOptions, LocalTestWorkflowEnvironmentOptions, TestWorkflowEnvironment as RealTestWorkflowEnvironment, TimeSkippingTestWorkflowEnvironmentOptions, @@ -213,6 +214,12 @@ export class TestWorkflowEnvironment extends RealTestWorkflowEnvironment { : undefined), }); } + + static async createExistingServer( + opts?: ExistingServerTestWorkflowEnvironmentOptions + ): Promise { + return RealTestWorkflowEnvironment.createExistingServer(opts); + } } // Some of our tests expect "default custom search attributes" to exists, which used to be the case From a17322a18e3e75049845f1c1a1def51d66d13377 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 14 Apr 2025 16:30:21 -0700 Subject: [PATCH 05/14] Versioning behavior passed through properly / definition changes --- packages/common/src/index.ts | 1 + packages/common/src/interfaces.ts | 1 + packages/common/src/worker-deployments.ts | 8 +- .../common/src/workflow-definition-options.ts | 18 ++ packages/common/src/workflow-options.ts | 9 +- .../src/deployment-versioning-v1/index.ts | 11 ++ .../src/deployment-versioning-v2/index.ts | 11 ++ .../src/deployment-versioning-v3/index.ts | 11 ++ .../src/test-worker-deployment-versioning.ts | 181 ++++++++++++++---- packages/test/src/workflows/definitions.ts | 3 +- .../src/workflows/deployment-versioning.ts | 16 -- packages/worker/src/worker.ts | 2 +- packages/workflow/src/interfaces.ts | 20 +- packages/workflow/src/internals.ts | 15 +- packages/workflow/src/worker-interface.ts | 7 +- packages/workflow/src/workflow.ts | 8 +- 16 files changed, 242 insertions(+), 80 deletions(-) create mode 100644 packages/common/src/workflow-definition-options.ts create mode 100644 packages/test/src/deployment-versioning-v1/index.ts create mode 100644 packages/test/src/deployment-versioning-v2/index.ts create mode 100644 packages/test/src/deployment-versioning-v3/index.ts delete mode 100644 packages/test/src/workflows/deployment-versioning.ts diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 04e963079..7aecd6dc7 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -23,6 +23,7 @@ export * from './priority'; export * from './retry-policy'; export type { Timestamp, Duration, StringValue } from './time'; export * from './worker-deployments'; +export * from './workflow-definition-options'; export * from './workflow-handle'; export * from './workflow-options'; export * from './versioning-intent'; diff --git a/packages/common/src/interfaces.ts b/packages/common/src/interfaces.ts index 34becc49b..f66a7ed82 100644 --- a/packages/common/src/interfaces.ts +++ b/packages/common/src/interfaces.ts @@ -1,4 +1,5 @@ import type { temporal } from '@temporalio/proto'; +import { WorkflowFunctionWithOptions } from './workflow-definition-options'; export type Payload = temporal.api.common.v1.IPayload; diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index 8023be01f..4722dd9ea 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -4,8 +4,12 @@ * @experimental Worker deployments are experimental */ export interface WorkerDeploymentVersion { - buildId: string; - deploymentName: string; + readonly buildId: string; + readonly deploymentName: string; +} + +export function toCanonicalString(version: WorkerDeploymentVersion): string { + return `${version.deploymentName}.${version.buildId}`; } export type VersioningBehavior = 'pinned' | 'auto-upgrade'; diff --git a/packages/common/src/workflow-definition-options.ts b/packages/common/src/workflow-definition-options.ts new file mode 100644 index 000000000..979d2b20a --- /dev/null +++ b/packages/common/src/workflow-definition-options.ts @@ -0,0 +1,18 @@ +import { VersioningBehavior } from './worker-deployments'; + +/** + * Options that can be used when defining a workflow via {@link defineWorkflowWithOptions}. + */ +export interface WorkflowDefinitionOptions { + versioningBehavior?: VersioningBehavior; +} + +type AsyncFunction = (...args: Args) => Promise; + +/** + * A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}. + */ +export interface WorkflowFunctionWithOptions extends AsyncFunction { + __temporal_is_workflow_function_with_options: true; + options: WorkflowDefinitionOptions; +} diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index aeb3aa349..52d06bc3d 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -5,6 +5,7 @@ import { Duration } from './time'; import { makeProtoEnumConverters } from './internal-workflow'; import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes'; import { Priority } from './priority'; +import { WorkflowFunctionWithOptions } from './workflow-definition-options'; /** * Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow. @@ -243,7 +244,9 @@ export interface WorkflowDurationOptions { export type CommonWorkflowOptions = BaseWorkflowOptions & WorkflowDurationOptions; -export function extractWorkflowType(workflowTypeOrFunc: string | T): string { +export function extractWorkflowType( + workflowTypeOrFunc: string | T | WorkflowFunctionWithOptions +): string { if (typeof workflowTypeOrFunc === 'string') return workflowTypeOrFunc as string; if (typeof workflowTypeOrFunc === 'function') { if (workflowTypeOrFunc?.name) return workflowTypeOrFunc.name; @@ -253,3 +256,7 @@ export function extractWorkflowType(workflowTypeOrFunc: stri `Invalid workflow type: expected either a string or a function, got '${typeof workflowTypeOrFunc}'` ); } + +export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions { + return obj.__temporal_is_workflow_function_with_options === true; +} diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts new file mode 100644 index 000000000..955bf5710 --- /dev/null +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -0,0 +1,11 @@ +import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning); +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v1'); + await condition(() => doFinish); + return 'version-v1'; +} diff --git a/packages/test/src/deployment-versioning-v2/index.ts b/packages/test/src/deployment-versioning-v2/index.ts new file mode 100644 index 000000000..d10ca5d24 --- /dev/null +++ b/packages/test/src/deployment-versioning-v2/index.ts @@ -0,0 +1,11 @@ +import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, deploymentVersioning); +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v2'); + await condition(() => doFinish); + return 'version-v2'; +} diff --git a/packages/test/src/deployment-versioning-v3/index.ts b/packages/test/src/deployment-versioning-v3/index.ts new file mode 100644 index 000000000..eab1060fb --- /dev/null +++ b/packages/test/src/deployment-versioning-v3/index.ts @@ -0,0 +1,11 @@ +import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning); +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v3'); + await condition(() => doFinish); + return 'version-v3'; +} diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index 4584b8827..5431a70bc 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -9,8 +9,9 @@ import asyncRetry from 'async-retry'; import { Client } from '@temporalio/client'; import { Worker } from './helpers'; import * as activities from './activities'; -import { WorkerDeploymentVersion } from '@temporalio/common'; +import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; import { makeTestFunction } from './helpers-integration'; +import { unblockSignal, versionQuery } from './workflows/'; const test = makeTestFunction({ workflowsPath: __filename }); @@ -33,7 +34,7 @@ test('Worker deployment based versioning', async (t) => { }; const worker1 = await Worker.create({ - workflowsPath: require.resolve('./workflows'), + workflowsPath: require.resolve('./deployment-versioning-v1'), activities, taskQueue, workerDeploymentOptions: { @@ -47,7 +48,7 @@ test('Worker deployment based versioning', async (t) => { }); const worker2 = await Worker.create({ - workflowsPath: require.resolve('./workflows'), + workflowsPath: require.resolve('./deployment-versioning-v2'), activities, taskQueue, workerDeploymentOptions: { @@ -61,7 +62,7 @@ test('Worker deployment based versioning', async (t) => { }); const worker3 = await Worker.create({ - workflowsPath: require.resolve('./workflows'), + workflowsPath: require.resolve('./deployment-versioning-v3'), activities, taskQueue, workerDeploymentOptions: { @@ -79,11 +80,11 @@ test('Worker deployment based versioning', async (t) => { await setCurrentDeploymentVersion(client, describeResp1.conflictToken, w1DeploymentVersion); // Start workflow 1 which will use the 1.0 worker on auto-upgrade - const wf1 = await client.workflow.start('autoUpgradeWorkflow', { + const wf1 = await client.workflow.start('deploymentVersioning', { taskQueue, - workflowId: 'basic-versioning-v1-' + randomUUID(), + workflowId: 'deployment-versioning-v1-' + randomUUID(), }); - const state1 = await wf1.query('state'); + const state1 = await wf1.query(versionQuery); assert.equal(state1, 'v1'); // Wait for worker 2 to be visible and set as current version @@ -91,11 +92,11 @@ test('Worker deployment based versioning', async (t) => { await setCurrentDeploymentVersion(client, describeResp2.conflictToken, w2DeploymentVersion); // Start workflow 2 which will use the 2.0 worker pinned - const wf2 = await client.workflow.start('pinnedWorkflow', { + const wf2 = await client.workflow.start('deploymentVersioning', { taskQueue, - workflowId: 'basic-versioning-v2-' + randomUUID(), + workflowId: 'deployment-versioning-v2-' + randomUUID(), }); - const state2 = await wf2.query('state'); + const state2 = await wf2.query(versionQuery); assert.equal(state2, 'v2'); // Wait for worker 3 to be visible and set as current version @@ -103,17 +104,17 @@ test('Worker deployment based versioning', async (t) => { await setCurrentDeploymentVersion(client, describeResp3.conflictToken, w3DeploymentVersion); // Start workflow 3 which will use the 3.0 worker on auto-upgrade - const wf3 = await client.workflow.start('autoUpgradeWorkflow', { + const wf3 = await client.workflow.start('deploymentVersioning', { taskQueue, - workflowId: 'basic-versioning-v3-' + randomUUID(), + workflowId: 'deployment-versioning-v3-' + randomUUID(), }); - const state3 = await wf3.query('state'); + const state3 = await wf3.query(versionQuery); assert.equal(state3, 'v3'); // Signal all workflows to finish - await wf1.signal('doFinish'); - await wf2.signal('doFinish'); - await wf3.signal('doFinish'); + await wf1.signal(unblockSignal); + await wf2.signal(unblockSignal); + await wf3.signal(unblockSignal); const res1 = await wf1.result(); const res2 = await wf2.result(); @@ -132,27 +133,139 @@ test('Worker deployment based versioning', async (t) => { t.pass(); }); +test('Worker deployment based versioning with ramping', async (t) => { + const taskQueue = 'worker-deployment-based-ramping-' + randomUUID(); + const deploymentName = 'deployment-ramping-' + randomUUID(); + const client = t.context.env.client; + + const v1 = { + buildId: '1.0', + deploymentName: deploymentName, + }; + const v2 = { + buildId: '2.0', + deploymentName: deploymentName, + }; + + const worker1 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v1'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: v1, + }, + }); + const worker1Promise = worker1.run(); + worker1Promise.catch((err) => { + t.fail('Worker 1.0 run error: ' + JSON.stringify(err)); + }); + + const worker2 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v2'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: v2, + }, + }); + const worker2Promise = worker2.run(); + worker2Promise.catch((err) => { + t.fail('Worker 2.0 run error: ' + JSON.stringify(err)); + }); + + // Wait for worker deployments to be visible + await waitUntilWorkerDeploymentVisible(client, v1); + const describeResp = await waitUntilWorkerDeploymentVisible(client, v2); + + // Set current version to v1 and ramp v2 to 100% + let conflictToken = (await setCurrentDeploymentVersion(client, describeResp.conflictToken, v1)).conflictToken; + conflictToken = (await setRampingVersion(client, conflictToken, v2, 100)).conflictToken; + + // Run workflows and verify they run on v2 + for (let i = 0; i < 3; i++) { + const wf = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: `versioning-ramp-100-${i}-${randomUUID()}`, + }); + await wf.signal(unblockSignal); + const res = await wf.result(); + assert.equal(res, 'version-v2'); + } + + // Set ramp to 0, expecting workflows to run on v1 + conflictToken = (await setRampingVersion(client, conflictToken, v2, 0)).conflictToken; + for (let i = 0; i < 3; i++) { + const wf = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: `versioning-ramp-0-${i}-${randomUUID()}`, + }); + await wf.signal(unblockSignal); + const res = await wf.result(); + assert.equal(res, 'version-v1'); + } + + // Set ramp to 50 and eventually verify workflows run on both versions + await setRampingVersion(client, conflictToken, v2, 50); + const seenResults = new Set(); + + const runAndRecord = async () => { + const wf = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: `versioning-ramp-50-${randomUUID()}`, + }); + await wf.signal(unblockSignal); + return await wf.result(); + }; + + await asyncRetry( + async () => { + const res = await runAndRecord(); + seenResults.add(res); + if (!seenResults.has('version-v1') || !seenResults.has('version-v2')) { + throw new Error('Not all versions seen yet'); + } + }, + { maxTimeout: 1000, retries: 20 } + ); + + worker1.shutdown(); + worker2.shutdown(); + await worker1Promise; + await worker2Promise; + t.pass(); +}); + +async function setRampingVersion( + client: Client, + conflictToken: Uint8Array, + version: WorkerDeploymentVersion, + percentage: number +) { + return await client.workflowService.setWorkerDeploymentRampingVersion({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + version: toCanonicalString(version), + conflictToken, + percentage, + }); +} + async function waitUntilWorkerDeploymentVisible(client: Client, version: WorkerDeploymentVersion) { return await asyncRetry( async () => { - try { - const resp = await client.workflowService.describeWorkerDeployment({ - namespace: client.options.namespace, - deploymentName: version.deploymentName, - }); - - const isVersionVisible = resp.workerDeploymentInfo!.versionSummaries!.some( - (vs) => vs.version === version.buildId - ); - - if (!isVersionVisible) { - throw new Error('Version not visible yet'); - } - - return resp; - } catch (error) { - throw error; + const resp = await client.workflowService.describeWorkerDeployment({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + }); + const isVersionVisible = resp.workerDeploymentInfo!.versionSummaries!.some( + (vs) => vs.version === toCanonicalString(version) + ); + if (!isVersionVisible) { + throw new Error('Version not visible yet'); } + return resp; }, { maxTimeout: 1000, retries: 10 } ); @@ -166,7 +279,7 @@ async function setCurrentDeploymentVersion( return await client.workflowService.setWorkerDeploymentCurrentVersion({ namespace: client.options.namespace, deploymentName: version.deploymentName, - version: version.buildId, + version: toCanonicalString(version), conflictToken, }); } diff --git a/packages/test/src/workflows/definitions.ts b/packages/test/src/workflows/definitions.ts index e5aa77e01..b1056fda1 100644 --- a/packages/test/src/workflows/definitions.ts +++ b/packages/test/src/workflows/definitions.ts @@ -1,8 +1,9 @@ /* eslint-disable no-duplicate-imports */ -import { defineSignal } from '@temporalio/workflow'; +import { defineQuery, defineSignal } from '@temporalio/workflow'; export const activityStartedSignal = defineSignal('activityStarted'); export const failSignal = defineSignal('fail'); export const failWithMessageSignal = defineSignal<[string]>('fail'); export const argsTestSignal = defineSignal<[number, string]>('argsTest'); export const unblockSignal = defineSignal('unblock'); +export const versionQuery = defineQuery('version'); diff --git a/packages/test/src/workflows/deployment-versioning.ts b/packages/test/src/workflows/deployment-versioning.ts deleted file mode 100644 index 5c9312c66..000000000 --- a/packages/test/src/workflows/deployment-versioning.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { CancelledFailure, defineQuery, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; -import { unblockSignal } from './definitions'; - -export const versionQuery = defineQuery('version'); - -export const deploymentVersioningV1AutoUpgrade = defineWorkflowWithOptions( - { versioningBehavior: 'auto-upgrade' }, - _deploymentVersioningV1AutoUpgrade -); -async function _deploymentVersioningV1AutoUpgrade(): Promise { - let doFinish = false; - setHandler(unblockSignal, () => void (doFinish = true)); - setHandler(versionQuery, () => 'v1'); - await condition(() => doFinish); - return 'version-v1'; -} diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index f10593bb8..eed074867 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1262,7 +1262,7 @@ export class Worker { priority, } = initWorkflowJob; - // Note that we can't do payload convertion here, as there's no guarantee that converted payloads would be safe to + // Note that we can't do payload conversion here, as there's no guarantee that converted payloads would be safe to // transfer through the V8 message port. Those will therefore be set in the Activator's initializeWorkflow job handler. const workflowInfo: WorkflowInfo = { workflowId, diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 63b82efaf..dc6c511ef 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -17,7 +17,7 @@ import { } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; -import type { coresdk } from '@temporalio/proto'; +import type { coresdk, temporal } from '@temporalio/proto'; /** * Workflow Execution information @@ -636,21 +636,5 @@ export type UpdateHandlerOptions = { export interface ActivationCompletion { commands: coresdk.workflow_commands.IWorkflowCommand[]; usedInternalFlags: number[]; -} - -/** - * Options that can be used when defining a workflow via {@link defineWorkflowWithOptions}. - */ -export interface WorkflowDefinitionOptions { - versioningBehavior?: VersioningBehavior; -} - -type AsyncFunction = (...args: Args) => Promise; - -/** - * A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}. - */ -export interface WorkflowFunctionWithOptions extends AsyncFunction { - __temporal_is_workflow_function_with_options: true; - options: WorkflowDefinitionOptions; + versioningBehavior?: temporal.api.enums.v1.VersioningBehavior; } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 75f7f2d91..b31ef00d5 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -20,6 +20,8 @@ import { WorkflowUpdateValidatorType, mapFromPayloads, fromPayloadsAtIndex, + WorkflowFunctionWithOptions, + VersioningBehavior, } from '@temporalio/common'; import { decodeSearchAttributes, @@ -27,7 +29,7 @@ import { } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; -import type { coresdk, temporal } from '@temporalio/proto'; +import { coresdk, temporal } from '@temporalio/proto'; import { alea, RNG } from './alea'; import { RootCancellationScope } from './cancellation-scope'; import { UpdateScope } from './update-scope'; @@ -382,7 +384,7 @@ export class Activator implements ActivationHandler { /** * Reference to the current Workflow, initialized when a Workflow is started */ - public workflow?: Workflow; + public workflow?: Workflow | WorkflowFunctionWithOptions; /** * Information about the current Workflow @@ -424,6 +426,8 @@ export class Activator implements ActivationHandler { public readonly registeredActivityNames: Set; + public versioningBehavior?: VersioningBehavior; + constructor({ info, now, @@ -493,9 +497,16 @@ export class Activator implements ActivationHandler { } concludeActivation(): ActivationCompletion { + let versioningBehavior; + if (this.versioningBehavior === 'auto-upgrade') { + versioningBehavior = temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE; + } else if (this.versioningBehavior === 'pinned') { + versioningBehavior = temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED; + } return { commands: this.commands.splice(0), usedInternalFlags: [...this.knownFlags], + versioningBehavior: versioningBehavior, }; } diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 0d902c565..80bb0df8b 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -3,7 +3,7 @@ * * @module */ -import { IllegalStateError } from '@temporalio/common'; +import { IllegalStateError, isWorkflowFunctionWithOptions } from '@temporalio/common'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { coresdk } from '@temporalio/proto'; import { disableStorage } from './cancellation-scope'; @@ -79,7 +79,10 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { const workflowFn = mod[activator.info.workflowType]; const defaultWorkflowFn = mod['default']; - if (typeof workflowFn === 'function') { + if (isWorkflowFunctionWithOptions(workflowFn)) { + activator.workflow = workflowFn; + activator.versioningBehavior = workflowFn.options.versioningBehavior; + } else if (typeof workflowFn === 'function') { activator.workflow = workflowFn; } else if (typeof defaultWorkflowFn === 'function') { activator.workflow = defaultWorkflowFn; diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 3de31f7c4..0cb61ee10 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -59,13 +59,12 @@ import { encodeParentClosePolicy, DefaultUpdateHandler, DefaultQueryHandler, - WorkflowDefinitionOptions, - WorkflowFunctionWithOptions, } from './interfaces'; import { LocalActivityDoBackoff } from './errors'; import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes'; import { untrackPromise } from './stack-helpers'; import { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle'; +import { WorkflowDefinitionOptions, WorkflowFunctionWithOptions } from '@temporalio/common'; // Avoid a circular dependency registerSleepImplementation(sleep); @@ -1628,7 +1627,10 @@ export function defineWorkflowWithOptions( options: WorkflowDefinitionOptions, fn: (...args: A) => Promise ): WorkflowFunctionWithOptions { - const wrappedFn = Object.assign(fn, { options, __temporal_is_workflow_function_with_options: true as const }); + const wrappedFn = Object.assign(fn, { + options, + __temporal_is_workflow_function_with_options: true as const, + }); return wrappedFn; } From 83c70fd17157a8d50e2063719b85c7ba4e7d60f3 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 15 Apr 2025 16:13:43 -0700 Subject: [PATCH 06/14] Dynamic workflow support --- packages/common/src/workflow-options.ts | 1 + .../src/deployment-versioning-v1/index.ts | 6 + .../src/test-worker-deployment-versioning.ts | 116 ++++++++++++++++++ packages/worker/src/worker-options.ts | 7 +- packages/workflow/src/worker-interface.ts | 3 + 5 files changed, 129 insertions(+), 4 deletions(-) diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index 52d06bc3d..d725cd506 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -258,5 +258,6 @@ export function extractWorkflowType( } export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions { + if (obj === undefined || obj === null) return false; return obj.__temporal_is_workflow_function_with_options === true; } diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts index 955bf5710..889f9425d 100644 --- a/packages/test/src/deployment-versioning-v1/index.ts +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -9,3 +9,9 @@ export async function deploymentVersioning(): Promise { await condition(() => doFinish); return 'version-v1'; } + +// Dynamic/default workflow handler +export default defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, _default); +async function _default(): Promise { + return 'dynamic'; +} diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index 5431a70bc..dfef4d367 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -12,6 +12,7 @@ import * as activities from './activities'; import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; import { makeTestFunction } from './helpers-integration'; import { unblockSignal, versionQuery } from './workflows/'; +import { temporal } from '@temporalio/proto'; const test = makeTestFunction({ workflowsPath: __filename }); @@ -40,6 +41,7 @@ test('Worker deployment based versioning', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: w1DeploymentVersion, + defaultVersioningBehavior: 'pinned', }, }); const worker1Promise = worker1.run(); @@ -54,6 +56,7 @@ test('Worker deployment based versioning', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: w2DeploymentVersion, + defaultVersioningBehavior: 'pinned', }, }); const worker2Promise = worker2.run(); @@ -68,6 +71,7 @@ test('Worker deployment based versioning', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: w3DeploymentVersion, + defaultVersioningBehavior: 'pinned', }, }); const worker3Promise = worker3.run(); @@ -154,6 +158,7 @@ test('Worker deployment based versioning with ramping', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: v1, + defaultVersioningBehavior: 'pinned', }, }); const worker1Promise = worker1.run(); @@ -168,6 +173,7 @@ test('Worker deployment based versioning with ramping', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: v2, + defaultVersioningBehavior: 'pinned', }, }); const worker2Promise = worker2.run(); @@ -237,6 +243,116 @@ test('Worker deployment based versioning with ramping', async (t) => { t.pass(); }); +test('Worker deployment with dynamic workflow on run', async (t) => { + if (t.context.env.supportsTimeSkipping) { + t.pass("Test Server doesn't support worker deployments"); + return; + } + + const taskQueue = 'worker-deployment-dynamic-' + randomUUID(); + const deploymentName = 'deployment-dynamic-' + randomUUID(); + const client = t.context.env.client; + + const version = { + buildId: '1.0', + deploymentName: deploymentName, + }; + + const worker = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v1'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: version, + defaultVersioningBehavior: 'auto-upgrade', + }, + }); + + const workerPromise = worker.run(); + workerPromise.catch((err) => { + t.fail('Worker run error: ' + JSON.stringify(err)); + }); + + const describeResp = await waitUntilWorkerDeploymentVisible(client, version); + await setCurrentDeploymentVersion(client, describeResp.conflictToken, version); + + const wf = await client.workflow.start('cooldynamicworkflow', { + taskQueue, + workflowId: 'dynamic-workflow-versioning-' + randomUUID(), + }); + + const result = await wf.result(); + assert.equal(result, 'dynamic'); + + // Check history for versioning behavior + const history = await wf.fetchHistory(); + + const hasPinnedVersioningBehavior = history.events!.some( + (event) => + event.workflowTaskCompletedEventAttributes && + event.workflowTaskCompletedEventAttributes.versioningBehavior === + temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED + ); + assert.ok(hasPinnedVersioningBehavior, 'Expected workflow to use pinned versioning behavior'); + + worker.shutdown(); + await workerPromise; + t.pass(); +}); + +test('Workflows can use default versioning behavior', async (t) => { + const taskQueue = 'task-queue-default-versioning-' + randomUUID(); + const deploymentName = 'deployment-default-versioning-' + randomUUID(); + const client = t.context.env.client; + + const workerV1 = { + buildId: '1.0', + deploymentName: deploymentName, + }; + + const worker = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-no-annotations'), + activities, + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: workerV1, + defaultVersioningBehavior: 'pinned', + }, + }); + + const workerPromise = worker.run(); + workerPromise.catch((err) => { + t.fail('Worker run error: ' + JSON.stringify(err)); + }); + + const describeResp = await waitUntilWorkerDeploymentVisible(client, workerV1); + await setCurrentDeploymentVersion(client, describeResp.conflictToken, workerV1); + + const wf = await client.workflow.start('noVersioningAnnotationWorkflow', { + taskQueue, + workflowId: 'default-versioning-behavior-' + randomUUID(), + }); + + await wf.result(); + + // Check history for versioning behavior + const history = await wf.fetchHistory(); + + const hasPinnedVersioningBehavior = history.events!.some( + (event) => + event.workflowTaskCompletedEventAttributes && + event.workflowTaskCompletedEventAttributes.versioningBehavior === + temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED + ); + assert.ok(hasPinnedVersioningBehavior, 'Expected workflow to use pinned versioning behavior'); + + worker.shutdown(); + await workerPromise; + t.pass(); +}); + async function setRampingVersion( client: Client, conflictToken: Uint8Array, diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 4a1219dd3..39c7a971e 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -100,11 +100,10 @@ export interface WorkerOptions { useWorkerVersioning: boolean; /** - * If specified, the default versioning behavior to use for all workflows on this worker. - * If not specified, and `useWorkerVersioning` is true, workflows that do not specify a - * versioning behavior via {@link TODO} will cause an error to be thrown on startup. + * The default versioning behavior to use for all workflows on this worker. Specifying a default + * behavior is required, */ - defaultVersioningBehavior?: VersioningBehavior; + defaultVersioningBehavior: VersioningBehavior; }; /** diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 80bb0df8b..a9db0cf9f 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -82,6 +82,9 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { if (isWorkflowFunctionWithOptions(workflowFn)) { activator.workflow = workflowFn; activator.versioningBehavior = workflowFn.options.versioningBehavior; + } else if (isWorkflowFunctionWithOptions(defaultWorkflowFn)) { + activator.workflow = defaultWorkflowFn; + activator.versioningBehavior = defaultWorkflowFn.options.versioningBehavior; } else if (typeof workflowFn === 'function') { activator.workflow = workflowFn; } else if (typeof defaultWorkflowFn === 'function') { From 6b2d94db4edfc5a96006eaa6dc2ae7e65d3018d0 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 15 Apr 2025 17:13:05 -0700 Subject: [PATCH 07/14] Address todos / add missed new file --- packages/common/src/worker-deployments.ts | 15 +++++++- .../index.ts | 14 +++++++ packages/test/src/helpers-integration.ts | 5 --- .../src/test-worker-deployment-versioning.ts | 24 ++++++------ packages/worker/src/worker-options.ts | 2 +- packages/worker/src/workflow/vm-shared.ts | 2 +- packages/workflow/src/interfaces.ts | 2 + packages/workflow/src/internals.ts | 2 +- packages/workflow/src/workflow.ts | 38 +++++++++++++++---- 9 files changed, 74 insertions(+), 30 deletions(-) create mode 100644 packages/test/src/deployment-versioning-no-annotations/index.ts diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index 4722dd9ea..af7d3e4e8 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -1,15 +1,28 @@ /** * Represents the version of a specific worker deployment. * - * @experimental Worker deployments are experimental + * @experimental Deployment based versioning is experimental and may change in the future. */ export interface WorkerDeploymentVersion { readonly buildId: string; readonly deploymentName: string; } +/** + * @returns The canonical representation of a deployment version, which is a string in the format + * `deploymentName.buildId`. + */ export function toCanonicalString(version: WorkerDeploymentVersion): string { return `${version.deploymentName}.${version.buildId}`; } +/** + * Specifies when a workflow might move from a worker of one Build Id to another. + * + * * 'pinned' - The workflow will be pinned to the current Build ID unless manually moved. + * * 'auto-upgrade' - The workflow will automatically move to the latest version (default Build ID + * of the task queue) when the next task is dispatched. + * + * @experimental Deployment based versioning is experimental and may change in the future. + */ export type VersioningBehavior = 'pinned' | 'auto-upgrade'; diff --git a/packages/test/src/deployment-versioning-no-annotations/index.ts b/packages/test/src/deployment-versioning-no-annotations/index.ts new file mode 100644 index 000000000..d476dfd42 --- /dev/null +++ b/packages/test/src/deployment-versioning-no-annotations/index.ts @@ -0,0 +1,14 @@ +import { setHandler, condition } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v1'); + await condition(() => doFinish); + return 'version-v1'; +} + +export default async function (): Promise { + return 'dynamic'; +} diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index b7ddfd740..743d1f821 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -101,11 +101,6 @@ export async function createLocalTestEnvironment( ...(opts || {}), // Use provided options or default to an empty object server: { searchAttributes: Object.values(defaultSAKeys), - // TODO: Remove after next CLI release - executable: { - type: 'cached-download', - version: 'v1.3.1-priority.0', - }, ...(opts?.server || {}), // Use provided server options or default to an empty object extraArgs: [ ...defaultDynamicConfigOptions.flatMap((opt) => ['--dynamic-config-value', opt]), diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index dfef4d367..a2786537a 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -7,12 +7,12 @@ import assert from 'assert'; import { randomUUID } from 'crypto'; import asyncRetry from 'async-retry'; import { Client } from '@temporalio/client'; +import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; +import { temporal } from '@temporalio/proto'; import { Worker } from './helpers'; import * as activities from './activities'; -import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; import { makeTestFunction } from './helpers-integration'; -import { unblockSignal, versionQuery } from './workflows/'; -import { temporal } from '@temporalio/proto'; +import { unblockSignal, versionQuery } from './workflows'; const test = makeTestFunction({ workflowsPath: __filename }); @@ -23,15 +23,15 @@ test('Worker deployment based versioning', async (t) => { const w1DeploymentVersion = { buildId: '1.0', - deploymentName: deploymentName, + deploymentName, }; const w2DeploymentVersion = { buildId: '2.0', - deploymentName: deploymentName, + deploymentName, }; const w3DeploymentVersion = { buildId: '3.0', - deploymentName: deploymentName, + deploymentName, }; const worker1 = await Worker.create({ @@ -144,11 +144,11 @@ test('Worker deployment based versioning with ramping', async (t) => { const v1 = { buildId: '1.0', - deploymentName: deploymentName, + deploymentName, }; const v2 = { buildId: '2.0', - deploymentName: deploymentName, + deploymentName, }; const worker1 = await Worker.create({ @@ -255,7 +255,7 @@ test('Worker deployment with dynamic workflow on run', async (t) => { const version = { buildId: '1.0', - deploymentName: deploymentName, + deploymentName, }; const worker = await Worker.create({ @@ -264,7 +264,7 @@ test('Worker deployment with dynamic workflow on run', async (t) => { taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, - version: version, + version, defaultVersioningBehavior: 'auto-upgrade', }, }); @@ -308,7 +308,7 @@ test('Workflows can use default versioning behavior', async (t) => { const workerV1 = { buildId: '1.0', - deploymentName: deploymentName, + deploymentName, }; const worker = await Worker.create({ @@ -337,9 +337,7 @@ test('Workflows can use default versioning behavior', async (t) => { await wf.result(); - // Check history for versioning behavior const history = await wf.fetchHistory(); - const hasPinnedVersioningBehavior = history.events!.some( (event) => event.workflowTaskCompletedEventAttributes && diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 39c7a971e..51d855d65 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -85,7 +85,7 @@ export interface WorkerOptions { /** * Deployment options for the worker. Exclusive with `build_id` and `use_worker_versioning`. - + * * @experimental Deployment based versioning is still experimental. */ workerDeploymentOptions?: { diff --git a/packages/worker/src/workflow/vm-shared.ts b/packages/worker/src/workflow/vm-shared.ts index 165f3ce6d..96b76fd44 100644 --- a/packages/worker/src/workflow/vm-shared.ts +++ b/packages/worker/src/workflow/vm-shared.ts @@ -14,9 +14,9 @@ import * as internals from '@temporalio/workflow/lib/worker-interface'; import { Activator } from '@temporalio/workflow/lib/internals'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { UnhandledRejectionError } from '../errors'; +import { convertDeploymentVersion } from '../utils'; import { Workflow } from './interface'; import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input'; -import { convertDeploymentVersion } from '../utils'; // Best effort to catch unhandled rejections from workflow code. // We crash the thread if we cannot find the culprit. diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index dc6c511ef..c868ee7c6 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -208,6 +208,8 @@ export interface WorkflowInfo { * executing this task for the first time and has a Deployment Version set, then its ID will be * used. This value may change over the lifetime of the workflow run, but is deterministic and * safe to use for branching. + * + * @experimental Deployment based versioning is experimental and may change in the future. */ readonly currentDeploymentVersion?: WorkerDeploymentVersion; diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index b31ef00d5..ec996c52d 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -506,7 +506,7 @@ export class Activator implements ActivationHandler { return { commands: this.commands.splice(0), usedInternalFlags: [...this.knownFlags], - versioningBehavior: versioningBehavior, + versioningBehavior, }; } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 0cb61ee10..1dc5773d0 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -32,6 +32,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; +import { WorkflowDefinitionOptions, WorkflowFunctionWithOptions } from '@temporalio/common'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { @@ -64,7 +65,6 @@ import { LocalActivityDoBackoff } from './errors'; import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes'; import { untrackPromise } from './stack-helpers'; import { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle'; -import { WorkflowDefinitionOptions, WorkflowFunctionWithOptions } from '@temporalio/common'; // Avoid a circular dependency registerSleepImplementation(sleep); @@ -1616,13 +1616,35 @@ export function allHandlersFinished(): boolean { } /** -* Can be used to define workflow functions with certain options specified at definition time. - -* @param options Options for the workflow defintion. -* @param fn The workflow function. -* @returns The same passed in workflow function, with the specified options applied. You can export -* this function to make it available as a workflow function. -*/ + * Can be used to alter or define workflow functions with certain options specified at definition + * time. In order to ensure that workflows are loaded properly by their name, you typically will not + * need to use the return value of this function. + * + * @example + * For example: + * ```ts + * defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, myWorkflow); + * export async function myWorkflow(): Promise { + * // Workflow code here + * return "hi"; + * } + * ``` + * + * @example + * To annotate a default or dynamic workflow: + * ```ts + * export default defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, myDefaultWorkflow); + * async function myDefaultWorkflow(): Promise { + * // Workflow code here + * return "hi"; + * } + * ``` + * + * @param options Options for the workflow defintion. + * @param fn The workflow function. + * @returns The same passed in workflow function, with the specified options applied. You can export + * this function to make it available as a workflow function. + */ export function defineWorkflowWithOptions( options: WorkflowDefinitionOptions, fn: (...args: A) => Promise From 4a72dfead102bd2f132cddf472cc98ff7370b3b8 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 17 Apr 2025 10:04:13 -0700 Subject: [PATCH 08/14] Fix review comments / lints --- packages/common/src/interfaces.ts | 1 - packages/common/src/workflow-options.ts | 3 ++- packages/test/src/deployment-versioning-v1/index.ts | 2 +- packages/test/src/deployment-versioning-v2/index.ts | 2 +- packages/test/src/deployment-versioning-v3/index.ts | 2 +- packages/test/src/test-integration-split-one.ts | 2 +- packages/test/src/test-integration-workflows.ts | 2 +- packages/test/src/test-sinks.ts | 2 +- packages/test/src/test-worker-deployment-versioning.ts | 8 -------- packages/worker/src/utils.ts | 2 +- packages/worker/src/worker-options.ts | 4 ++-- packages/worker/src/worker.ts | 3 ++- packages/workflow/src/flags.ts | 2 +- packages/workflow/src/interfaces.ts | 1 - packages/workflow/src/workflow.ts | 3 ++- 15 files changed, 16 insertions(+), 23 deletions(-) diff --git a/packages/common/src/interfaces.ts b/packages/common/src/interfaces.ts index f66a7ed82..34becc49b 100644 --- a/packages/common/src/interfaces.ts +++ b/packages/common/src/interfaces.ts @@ -1,5 +1,4 @@ import type { temporal } from '@temporalio/proto'; -import { WorkflowFunctionWithOptions } from './workflow-definition-options'; export type Payload = temporal.api.common.v1.IPayload; diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index d725cd506..4bb477f82 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -257,7 +257,8 @@ export function extractWorkflowType( ); } +/* eslint-disable @typescript-eslint/explicit-module-boundary-types */ export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions { - if (obj === undefined || obj === null) return false; + if (obj == null) return false; return obj.__temporal_is_workflow_function_with_options === true; } diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts index 889f9425d..f414dcc1b 100644 --- a/packages/test/src/deployment-versioning-v1/index.ts +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -1,4 +1,4 @@ -import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning); diff --git a/packages/test/src/deployment-versioning-v2/index.ts b/packages/test/src/deployment-versioning-v2/index.ts index d10ca5d24..a2f793633 100644 --- a/packages/test/src/deployment-versioning-v2/index.ts +++ b/packages/test/src/deployment-versioning-v2/index.ts @@ -1,4 +1,4 @@ -import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, deploymentVersioning); diff --git a/packages/test/src/deployment-versioning-v3/index.ts b/packages/test/src/deployment-versioning-v3/index.ts index eab1060fb..5e40fd6dc 100644 --- a/packages/test/src/deployment-versioning-v3/index.ts +++ b/packages/test/src/deployment-versioning-v3/index.ts @@ -1,4 +1,4 @@ -import { CancelledFailure, setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning); diff --git a/packages/test/src/test-integration-split-one.ts b/packages/test/src/test-integration-split-one.ts index 115285c14..59366dc2c 100644 --- a/packages/test/src/test-integration-split-one.ts +++ b/packages/test/src/test-integration-split-one.ts @@ -737,7 +737,7 @@ test.serial('Workflow can read WorkflowInfo', configMacro, async (t, config) => historySize: res.historySize, startTime: res.startTime, runStartTime: res.runStartTime, - currentBuildId: res.currentBuildId, + currentBuildId: res.currentBuildId, // eslint-disable-line deprecation/deprecation // unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast unsafe: { isReplaying: false } as UnsafeWorkflowInfo, priority: {}, diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 3d0c44321..ea05b71f6 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -496,7 +496,7 @@ export async function buildIdTester(): Promise { }); workflow.setHandler(getBuildIdQuery, () => { - return workflow.workflowInfo().currentBuildId ?? ''; + return workflow.workflowInfo().currentBuildId ?? ''; // eslint-disable-line deprecation/deprecation }); // The unblock signal will only be sent once we are in Worker 1.1. diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index 64be0cbaa..e6036d601 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -94,7 +94,7 @@ if (RUN_INTEGRATION_TESTS) { }); // Capture volatile values that are hard to predict - const { historySize, startTime, runStartTime, currentBuildId } = recordedCalls[0].info; + const { historySize, startTime, runStartTime, currentBuildId } = recordedCalls[0].info; // eslint-disable-line deprecation/deprecation t.true(historySize > 300); const info: WorkflowInfo = { diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index a2786537a..06d919a3f 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -10,7 +10,6 @@ import { Client } from '@temporalio/client'; import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; import { temporal } from '@temporalio/proto'; import { Worker } from './helpers'; -import * as activities from './activities'; import { makeTestFunction } from './helpers-integration'; import { unblockSignal, versionQuery } from './workflows'; @@ -36,7 +35,6 @@ test('Worker deployment based versioning', async (t) => { const worker1 = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-v1'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, @@ -51,7 +49,6 @@ test('Worker deployment based versioning', async (t) => { const worker2 = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-v2'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, @@ -66,7 +63,6 @@ test('Worker deployment based versioning', async (t) => { const worker3 = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-v3'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, @@ -153,7 +149,6 @@ test('Worker deployment based versioning with ramping', async (t) => { const worker1 = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-v1'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, @@ -168,7 +163,6 @@ test('Worker deployment based versioning with ramping', async (t) => { const worker2 = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-v2'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, @@ -260,7 +254,6 @@ test('Worker deployment with dynamic workflow on run', async (t) => { const worker = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-v1'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, @@ -313,7 +306,6 @@ test('Workflows can use default versioning behavior', async (t) => { const worker = await Worker.create({ workflowsPath: require.resolve('./deployment-versioning-no-annotations'), - activities, taskQueue, workerDeploymentOptions: { useWorkerVersioning: true, diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index 467be6438..2b7932508 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -33,7 +33,7 @@ export function convertToParentWorkflowType( export function convertDeploymentVersion( v: coresdk.common.IWorkerDeploymentVersion | null | undefined ): WorkerDeploymentVersion | undefined { - if (!v) { + if (v == null) { return undefined; } diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 51d855d65..430980492 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -794,8 +794,8 @@ export type CompiledWorkerOptionsWithBuildId = CompiledWorkerOptions & { function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): WorkerOptionsWithDefaults { const { - buildId, - useVersioning, + buildId, // eslint-disable-line deprecation/deprecation + useVersioning, // eslint-disable-line deprecation/deprecation maxCachedWorkflows, showStackTraceSources, namespace, diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index eed074867..d50b63490 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -154,7 +154,8 @@ interface WorkflowWithLogAttributes { } function addBuildIdIfMissing(options: CompiledWorkerOptions, bundleCode?: string): CompiledWorkerOptionsWithBuildId { - if (options.buildId != null) { + const bid = options.buildId; // eslint-disable-line deprecation/deprecation + if (bid != null) { return options as CompiledWorkerOptionsWithBuildId; } const suffix = bundleCode ? `+${crypto.createHash('sha256').update(bundleCode).digest('hex')}` : ''; diff --git a/packages/workflow/src/flags.ts b/packages/workflow/src/flags.ts index 139ee8a5e..1ed209b13 100644 --- a/packages/workflow/src/flags.ts +++ b/packages/workflow/src/flags.ts @@ -72,5 +72,5 @@ type AltConditionFn = (ctx: { info: WorkflowInfo }) => boolean; function buildIdSdkVersionMatches(version: RegExp): AltConditionFn { const regex = new RegExp(`^@temporalio/worker@(${version.source})[+]`); - return ({ info }) => info.currentBuildId != null && regex.test(info.currentBuildId); + return ({ info }) => info.currentBuildId != null && regex.test(info.currentBuildId); // eslint-disable-line deprecation/deprecation } diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index c868ee7c6..d60961992 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -13,7 +13,6 @@ import { SearchAttributePair, Priority, WorkerDeploymentVersion, - VersioningBehavior, } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 1dc5773d0..dc12df83b 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -23,6 +23,8 @@ import { WorkflowUpdateValidatorType, SearchAttributeUpdatePair, compilePriority, + WorkflowDefinitionOptions, + WorkflowFunctionWithOptions, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -32,7 +34,6 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { temporal } from '@temporalio/proto'; -import { WorkflowDefinitionOptions, WorkflowFunctionWithOptions } from '@temporalio/common'; import { CancellationScope, registerSleepImplementation } from './cancellation-scope'; import { UpdateScope } from './update-scope'; import { From 71942f96373ac201fff0eb22d1bb0c86645c7643 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 17 Apr 2025 14:07:32 -0700 Subject: [PATCH 09/14] Use more standard enum representation / fix related errors --- packages/common/src/worker-deployments.ts | 24 ++++++++++++++++++- .../src/deployment-versioning-v1/index.ts | 4 ++-- .../src/deployment-versioning-v2/index.ts | 2 +- .../src/deployment-versioning-v3/index.ts | 2 +- .../src/test-worker-deployment-versioning.ts | 14 +++++------ packages/workflow/src/interfaces.ts | 5 ++-- packages/workflow/src/internals.ts | 10 ++------ packages/workflow/src/worker-interface.ts | 15 +++++++++--- 8 files changed, 51 insertions(+), 25 deletions(-) diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index af7d3e4e8..b9091a8ef 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -1,3 +1,6 @@ +import { temporal } from '@temporalio/proto'; +import { makeProtoEnumConverters } from './internal-workflow'; + /** * Represents the version of a specific worker deployment. * @@ -25,4 +28,23 @@ export function toCanonicalString(version: WorkerDeploymentVersion): string { * * @experimental Deployment based versioning is experimental and may change in the future. */ -export type VersioningBehavior = 'pinned' | 'auto-upgrade'; +export const VersioningBehavior = { + PINNED: 'PINNED', + AUTO_UPGRADE: 'AUTO_UPGRADE', +} as const; +export type VersioningBehavior = (typeof VersioningBehavior)[keyof typeof VersioningBehavior]; + +export const [encodeVersioningBehavior, decodeVersioningBehavior] = makeProtoEnumConverters< + temporal.api.enums.v1.VersioningBehavior, + typeof temporal.api.enums.v1.VersioningBehavior, + keyof typeof temporal.api.enums.v1.VersioningBehavior, + typeof VersioningBehavior, + 'VERSIONING_BEHAVIOR_' +>( + { + [VersioningBehavior.PINNED]: 1, + [VersioningBehavior.AUTO_UPGRADE]: 2, + UNSPECIFIED: 0, + } as const, + 'VERSIONING_BEHAVIOR_' +); diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts index f414dcc1b..7e11c1856 100644 --- a/packages/test/src/deployment-versioning-v1/index.ts +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -1,7 +1,7 @@ import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; -defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning); +defineWorkflowWithOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); export async function deploymentVersioning(): Promise { let doFinish = false; setHandler(unblockSignal, () => void (doFinish = true)); @@ -11,7 +11,7 @@ export async function deploymentVersioning(): Promise { } // Dynamic/default workflow handler -export default defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, _default); +export default defineWorkflowWithOptions({ versioningBehavior: 'PINNED' }, _default); async function _default(): Promise { return 'dynamic'; } diff --git a/packages/test/src/deployment-versioning-v2/index.ts b/packages/test/src/deployment-versioning-v2/index.ts index a2f793633..87fd5863a 100644 --- a/packages/test/src/deployment-versioning-v2/index.ts +++ b/packages/test/src/deployment-versioning-v2/index.ts @@ -1,7 +1,7 @@ import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; -defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, deploymentVersioning); +defineWorkflowWithOptions({ versioningBehavior: 'PINNED' }, deploymentVersioning); export async function deploymentVersioning(): Promise { let doFinish = false; setHandler(unblockSignal, () => void (doFinish = true)); diff --git a/packages/test/src/deployment-versioning-v3/index.ts b/packages/test/src/deployment-versioning-v3/index.ts index 5e40fd6dc..b499779b2 100644 --- a/packages/test/src/deployment-versioning-v3/index.ts +++ b/packages/test/src/deployment-versioning-v3/index.ts @@ -1,7 +1,7 @@ import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; -defineWorkflowWithOptions({ versioningBehavior: 'auto-upgrade' }, deploymentVersioning); +defineWorkflowWithOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); export async function deploymentVersioning(): Promise { let doFinish = false; setHandler(unblockSignal, () => void (doFinish = true)); diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index 06d919a3f..e5eeef8e4 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -39,7 +39,7 @@ test('Worker deployment based versioning', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: w1DeploymentVersion, - defaultVersioningBehavior: 'pinned', + defaultVersioningBehavior: 'PINNED', }, }); const worker1Promise = worker1.run(); @@ -53,7 +53,7 @@ test('Worker deployment based versioning', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: w2DeploymentVersion, - defaultVersioningBehavior: 'pinned', + defaultVersioningBehavior: 'PINNED', }, }); const worker2Promise = worker2.run(); @@ -67,7 +67,7 @@ test('Worker deployment based versioning', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: w3DeploymentVersion, - defaultVersioningBehavior: 'pinned', + defaultVersioningBehavior: 'PINNED', }, }); const worker3Promise = worker3.run(); @@ -153,7 +153,7 @@ test('Worker deployment based versioning with ramping', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: v1, - defaultVersioningBehavior: 'pinned', + defaultVersioningBehavior: 'PINNED', }, }); const worker1Promise = worker1.run(); @@ -167,7 +167,7 @@ test('Worker deployment based versioning with ramping', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: v2, - defaultVersioningBehavior: 'pinned', + defaultVersioningBehavior: 'PINNED', }, }); const worker2Promise = worker2.run(); @@ -258,7 +258,7 @@ test('Worker deployment with dynamic workflow on run', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version, - defaultVersioningBehavior: 'auto-upgrade', + defaultVersioningBehavior: 'AUTO_UPGRADE', }, }); @@ -310,7 +310,7 @@ test('Workflows can use default versioning behavior', async (t) => { workerDeploymentOptions: { useWorkerVersioning: true, version: workerV1, - defaultVersioningBehavior: 'pinned', + defaultVersioningBehavior: 'PINNED', }, }); diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index d60961992..54f0aa5de 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -13,10 +13,11 @@ import { SearchAttributePair, Priority, WorkerDeploymentVersion, + VersioningBehavior, } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; -import type { coresdk, temporal } from '@temporalio/proto'; +import type { coresdk } from '@temporalio/proto'; /** * Workflow Execution information @@ -637,5 +638,5 @@ export type UpdateHandlerOptions = { export interface ActivationCompletion { commands: coresdk.workflow_commands.IWorkflowCommand[]; usedInternalFlags: number[]; - versioningBehavior?: temporal.api.enums.v1.VersioningBehavior; + versioningBehavior?: VersioningBehavior; } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index ec996c52d..638da22a6 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -29,7 +29,7 @@ import { } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; -import { coresdk, temporal } from '@temporalio/proto'; +import type { coresdk, temporal } from '@temporalio/proto'; import { alea, RNG } from './alea'; import { RootCancellationScope } from './cancellation-scope'; import { UpdateScope } from './update-scope'; @@ -497,16 +497,10 @@ export class Activator implements ActivationHandler { } concludeActivation(): ActivationCompletion { - let versioningBehavior; - if (this.versioningBehavior === 'auto-upgrade') { - versioningBehavior = temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE; - } else if (this.versioningBehavior === 'pinned') { - versioningBehavior = temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED; - } return { commands: this.commands.splice(0), usedInternalFlags: [...this.knownFlags], - versioningBehavior, + versioningBehavior: this.versioningBehavior, }; } diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index a9db0cf9f..399e5c79b 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -3,9 +3,14 @@ * * @module */ -import { IllegalStateError, isWorkflowFunctionWithOptions } from '@temporalio/common'; +import { + encodeVersioningBehavior, + IllegalStateError, + isWorkflowFunctionWithOptions, + VersioningBehavior, +} from '@temporalio/common'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; -import { coresdk } from '@temporalio/proto'; +import { coresdk, temporal } from '@temporalio/proto'; import { disableStorage } from './cancellation-scope'; import { disableUpdateStorage } from './update-scope'; import { WorkflowInterceptorsFactory } from './interceptors'; @@ -209,7 +214,11 @@ export function concludeActivation(): coresdk.workflow_completion.IWorkflowActiv } return { runId: activator.info.runId, - successful: { ...activationCompletion, commands }, + successful: { + ...activationCompletion, + commands, + versioningBehavior: encodeVersioningBehavior(activationCompletion.versioningBehavior), + }, }; } finally { activator.rethrowSynchronously = false; From e89d4776dc5e86ff0c247ddc3ac873b363f425cc Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 17 Apr 2025 15:09:07 -0700 Subject: [PATCH 10/14] Test fixes --- packages/test/src/test-integration-split-one.ts | 1 + packages/test/src/test-sinks.ts | 3 ++- .../test/src/test-worker-deployment-versioning.ts | 15 +++++++++++---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/packages/test/src/test-integration-split-one.ts b/packages/test/src/test-integration-split-one.ts index 59366dc2c..225f0df5a 100644 --- a/packages/test/src/test-integration-split-one.ts +++ b/packages/test/src/test-integration-split-one.ts @@ -738,6 +738,7 @@ test.serial('Workflow can read WorkflowInfo', configMacro, async (t, config) => startTime: res.startTime, runStartTime: res.runStartTime, currentBuildId: res.currentBuildId, // eslint-disable-line deprecation/deprecation + currentDeploymentVersion: res.currentDeploymentVersion, // unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast unsafe: { isReplaying: false } as UnsafeWorkflowInfo, priority: {}, diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index e6036d601..a39bfcb1a 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -94,7 +94,7 @@ if (RUN_INTEGRATION_TESTS) { }); // Capture volatile values that are hard to predict - const { historySize, startTime, runStartTime, currentBuildId } = recordedCalls[0].info; // eslint-disable-line deprecation/deprecation + const { historySize, startTime, runStartTime, currentBuildId, currentDeploymentVersion } = recordedCalls[0].info; // eslint-disable-line deprecation/deprecation t.true(historySize > 300); const info: WorkflowInfo = { @@ -130,6 +130,7 @@ if (RUN_INTEGRATION_TESTS) { startTime, runStartTime, currentBuildId, + currentDeploymentVersion, // unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast unsafe: { isReplaying: false, diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index e5eeef8e4..8f5f2484a 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -18,7 +18,7 @@ const test = makeTestFunction({ workflowsPath: __filename }); test('Worker deployment based versioning', async (t) => { const taskQueue = 'worker-deployment-based-versioning-' + randomUUID(); const deploymentName = 'deployment-' + randomUUID(); - const client = t.context.env.client; + const { client, nativeConnection } = t.context.env; const w1DeploymentVersion = { buildId: '1.0', @@ -41,6 +41,7 @@ test('Worker deployment based versioning', async (t) => { version: w1DeploymentVersion, defaultVersioningBehavior: 'PINNED', }, + connection: nativeConnection, }); const worker1Promise = worker1.run(); worker1Promise.catch((err) => { @@ -55,6 +56,7 @@ test('Worker deployment based versioning', async (t) => { version: w2DeploymentVersion, defaultVersioningBehavior: 'PINNED', }, + connection: nativeConnection, }); const worker2Promise = worker2.run(); worker2Promise.catch((err) => { @@ -69,6 +71,7 @@ test('Worker deployment based versioning', async (t) => { version: w3DeploymentVersion, defaultVersioningBehavior: 'PINNED', }, + connection: nativeConnection, }); const worker3Promise = worker3.run(); worker3Promise.catch((err) => { @@ -136,7 +139,7 @@ test('Worker deployment based versioning', async (t) => { test('Worker deployment based versioning with ramping', async (t) => { const taskQueue = 'worker-deployment-based-ramping-' + randomUUID(); const deploymentName = 'deployment-ramping-' + randomUUID(); - const client = t.context.env.client; + const { client, nativeConnection } = t.context.env; const v1 = { buildId: '1.0', @@ -155,6 +158,7 @@ test('Worker deployment based versioning with ramping', async (t) => { version: v1, defaultVersioningBehavior: 'PINNED', }, + connection: nativeConnection, }); const worker1Promise = worker1.run(); worker1Promise.catch((err) => { @@ -169,6 +173,7 @@ test('Worker deployment based versioning with ramping', async (t) => { version: v2, defaultVersioningBehavior: 'PINNED', }, + connection: nativeConnection, }); const worker2Promise = worker2.run(); worker2Promise.catch((err) => { @@ -245,7 +250,7 @@ test('Worker deployment with dynamic workflow on run', async (t) => { const taskQueue = 'worker-deployment-dynamic-' + randomUUID(); const deploymentName = 'deployment-dynamic-' + randomUUID(); - const client = t.context.env.client; + const { client, nativeConnection } = t.context.env; const version = { buildId: '1.0', @@ -260,6 +265,7 @@ test('Worker deployment with dynamic workflow on run', async (t) => { version, defaultVersioningBehavior: 'AUTO_UPGRADE', }, + connection: nativeConnection, }); const workerPromise = worker.run(); @@ -297,7 +303,7 @@ test('Worker deployment with dynamic workflow on run', async (t) => { test('Workflows can use default versioning behavior', async (t) => { const taskQueue = 'task-queue-default-versioning-' + randomUUID(); const deploymentName = 'deployment-default-versioning-' + randomUUID(); - const client = t.context.env.client; + const { client, nativeConnection } = t.context.env; const workerV1 = { buildId: '1.0', @@ -312,6 +318,7 @@ test('Workflows can use default versioning behavior', async (t) => { version: workerV1, defaultVersioningBehavior: 'PINNED', }, + connection: nativeConnection, }); const workerPromise = worker.run(); From 5f7fed9d306b523ed0c87aab013c562f31456ded Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 23 Apr 2025 16:46:44 -0700 Subject: [PATCH 11/14] Options getter function --- .../common/src/workflow-definition-options.ts | 3 ++- .../src/deployment-versioning-v1/index.ts | 13 ++++++++++- .../src/test-worker-deployment-versioning.ts | 23 ++++++++++++++----- packages/workflow/src/internals.ts | 5 ++++ packages/workflow/src/worker-interface.ts | 21 +++++++++-------- packages/workflow/src/workflow.ts | 8 ++++--- 6 files changed, 53 insertions(+), 20 deletions(-) diff --git a/packages/common/src/workflow-definition-options.ts b/packages/common/src/workflow-definition-options.ts index 979d2b20a..63bda6258 100644 --- a/packages/common/src/workflow-definition-options.ts +++ b/packages/common/src/workflow-definition-options.ts @@ -8,11 +8,12 @@ export interface WorkflowDefinitionOptions { } type AsyncFunction = (...args: Args) => Promise; +export type WorkflowDefinitionOptionsOrGetter = WorkflowDefinitionOptions | (() => WorkflowDefinitionOptions); /** * A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}. */ export interface WorkflowFunctionWithOptions extends AsyncFunction { __temporal_is_workflow_function_with_options: true; - options: WorkflowDefinitionOptions; + options: WorkflowDefinitionOptionsOrGetter; } diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts index 7e11c1856..e9d6b08d0 100644 --- a/packages/test/src/deployment-versioning-v1/index.ts +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -1,4 +1,4 @@ -import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { setHandler, condition, defineWorkflowWithOptions, workflowInfo } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; defineWorkflowWithOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); @@ -15,3 +15,14 @@ export default defineWorkflowWithOptions({ versioningBehavior: 'PINNED' }, _defa async function _default(): Promise { return 'dynamic'; } + +defineWorkflowWithOptions(() => { + // Need to ensure accessing workflow context still works in here + workflowInfo(); + return { + versioningBehavior: 'PINNED', + }; +}, usesGetter); +export async function usesGetter(): Promise { + return 'usesGetter'; +} diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts index 8f5f2484a..0a75800d8 100644 --- a/packages/test/src/test-worker-deployment-versioning.ts +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -6,11 +6,12 @@ import assert from 'assert'; import { randomUUID } from 'crypto'; import asyncRetry from 'async-retry'; +import { ExecutionContext } from 'ava'; import { Client } from '@temporalio/client'; import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; import { temporal } from '@temporalio/proto'; import { Worker } from './helpers'; -import { makeTestFunction } from './helpers-integration'; +import { Context, makeTestFunction } from './helpers-integration'; import { unblockSignal, versionQuery } from './workflows'; const test = makeTestFunction({ workflowsPath: __filename }); @@ -242,7 +243,11 @@ test('Worker deployment based versioning with ramping', async (t) => { t.pass(); }); -test('Worker deployment with dynamic workflow on run', async (t) => { +async function testWorkerDeploymentWithDynamicBehavior( + t: ExecutionContext, + workflowName: string, + expectedResult: string +) { if (t.context.env.supportsTimeSkipping) { t.pass("Test Server doesn't support worker deployments"); return; @@ -276,17 +281,15 @@ test('Worker deployment with dynamic workflow on run', async (t) => { const describeResp = await waitUntilWorkerDeploymentVisible(client, version); await setCurrentDeploymentVersion(client, describeResp.conflictToken, version); - const wf = await client.workflow.start('cooldynamicworkflow', { + const wf = await client.workflow.start(workflowName, { taskQueue, workflowId: 'dynamic-workflow-versioning-' + randomUUID(), }); const result = await wf.result(); - assert.equal(result, 'dynamic'); + assert.equal(result, expectedResult); - // Check history for versioning behavior const history = await wf.fetchHistory(); - const hasPinnedVersioningBehavior = history.events!.some( (event) => event.workflowTaskCompletedEventAttributes && @@ -298,6 +301,14 @@ test('Worker deployment with dynamic workflow on run', async (t) => { worker.shutdown(); await workerPromise; t.pass(); +} + +test('Worker deployment with dynamic workflow static behavior', async (t) => { + await testWorkerDeploymentWithDynamicBehavior(t, 'cooldynamicworkflow', 'dynamic'); +}); + +test('Worker deployment with behavior in getter', async (t) => { + await testWorkerDeploymentWithDynamicBehavior(t, 'usesGetter', 'usesGetter'); }); test('Workflows can use default versioning behavior', async (t) => { diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 638da22a6..b8e86b2a2 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -22,6 +22,7 @@ import { fromPayloadsAtIndex, WorkflowFunctionWithOptions, VersioningBehavior, + WorkflowDefinitionOptions, } from '@temporalio/common'; import { decodeSearchAttributes, @@ -427,6 +428,7 @@ export class Activator implements ActivationHandler { public readonly registeredActivityNames: Set; public versioningBehavior?: VersioningBehavior; + public workflowDefinitionOptionsGetter?: () => WorkflowDefinitionOptions; constructor({ info, @@ -542,6 +544,9 @@ export class Activator implements ActivationHandler { ? this.failureConverter.failureToError(continuedFailure, this.payloadConverter) : undefined, })); + if (this.workflowDefinitionOptionsGetter) { + this.versioningBehavior = this.workflowDefinitionOptionsGetter().versioningBehavior; + } } public cancelWorkflow(_activation: coresdk.workflow_activation.ICancelWorkflow): void { diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 399e5c79b..25eaa8fb3 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -3,14 +3,9 @@ * * @module */ -import { - encodeVersioningBehavior, - IllegalStateError, - isWorkflowFunctionWithOptions, - VersioningBehavior, -} from '@temporalio/common'; +import { encodeVersioningBehavior, IllegalStateError, isWorkflowFunctionWithOptions } from '@temporalio/common'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; -import { coresdk, temporal } from '@temporalio/proto'; +import { coresdk } from '@temporalio/proto'; import { disableStorage } from './cancellation-scope'; import { disableUpdateStorage } from './update-scope'; import { WorkflowInterceptorsFactory } from './interceptors'; @@ -86,10 +81,18 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { if (isWorkflowFunctionWithOptions(workflowFn)) { activator.workflow = workflowFn; - activator.versioningBehavior = workflowFn.options.versioningBehavior; + if (typeof workflowFn.options === 'object') { + activator.versioningBehavior = workflowFn.options.versioningBehavior; + } else { + activator.workflowDefinitionOptionsGetter = workflowFn.options; + } } else if (isWorkflowFunctionWithOptions(defaultWorkflowFn)) { activator.workflow = defaultWorkflowFn; - activator.versioningBehavior = defaultWorkflowFn.options.versioningBehavior; + if (typeof defaultWorkflowFn.options === 'object') { + activator.versioningBehavior = defaultWorkflowFn.options.versioningBehavior; + } else { + activator.workflowDefinitionOptionsGetter = defaultWorkflowFn.options; + } } else if (typeof workflowFn === 'function') { activator.workflow = workflowFn; } else if (typeof defaultWorkflowFn === 'function') { diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index dc12df83b..499ed67f8 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -23,7 +23,7 @@ import { WorkflowUpdateValidatorType, SearchAttributeUpdatePair, compilePriority, - WorkflowDefinitionOptions, + WorkflowDefinitionOptionsOrGetter, WorkflowFunctionWithOptions, } from '@temporalio/common'; import { @@ -1641,13 +1641,15 @@ export function allHandlersFinished(): boolean { * } * ``` * - * @param options Options for the workflow defintion. + * @param options Options for the workflow defintion, or a function that returns options. If a + * function is provided, it will be called once just before the workflow function is called for the + * first time. * @param fn The workflow function. * @returns The same passed in workflow function, with the specified options applied. You can export * this function to make it available as a workflow function. */ export function defineWorkflowWithOptions( - options: WorkflowDefinitionOptions, + options: WorkflowDefinitionOptionsOrGetter, fn: (...args: A) => Promise ): WorkflowFunctionWithOptions { const wrappedFn = Object.assign(fn, { From 2f2e67f9c05529cc264c1df210612432f06caf44 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 28 Apr 2025 11:05:43 -0700 Subject: [PATCH 12/14] Review fixes --- packages/common/src/worker-deployments.ts | 2 +- .../common/src/workflow-definition-options.ts | 5 ++- packages/common/src/workflow-options.ts | 2 +- .../src/deployment-versioning-v1/index.ts | 10 +++--- .../src/deployment-versioning-v2/index.ts | 4 +-- .../src/deployment-versioning-v3/index.ts | 4 +-- packages/test/src/helpers-integration.ts | 2 +- packages/test/src/helpers.ts | 4 +-- packages/worker/src/utils.ts | 4 +-- packages/workflow/src/worker-interface.ts | 32 +++++++++++-------- packages/workflow/src/workflow.ts | 24 ++++++-------- 11 files changed, 45 insertions(+), 48 deletions(-) diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index b9091a8ef..36e8bbd49 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -1,4 +1,4 @@ -import { temporal } from '@temporalio/proto'; +import type { temporal } from '@temporalio/proto'; import { makeProtoEnumConverters } from './internal-workflow'; /** diff --git a/packages/common/src/workflow-definition-options.ts b/packages/common/src/workflow-definition-options.ts index 63bda6258..fe7aeac0e 100644 --- a/packages/common/src/workflow-definition-options.ts +++ b/packages/common/src/workflow-definition-options.ts @@ -1,7 +1,7 @@ import { VersioningBehavior } from './worker-deployments'; /** - * Options that can be used when defining a workflow via {@link defineWorkflowWithOptions}. + * Options that can be used when defining a workflow via {@link setWorkflowOptions}. */ export interface WorkflowDefinitionOptions { versioningBehavior?: VersioningBehavior; @@ -14,6 +14,5 @@ export type WorkflowDefinitionOptionsOrGetter = WorkflowDefinitionOptions | (() * A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}. */ export interface WorkflowFunctionWithOptions extends AsyncFunction { - __temporal_is_workflow_function_with_options: true; - options: WorkflowDefinitionOptionsOrGetter; + workflowDefinitionOptions: WorkflowDefinitionOptionsOrGetter; } diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index 4bb477f82..51e63f8bd 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -260,5 +260,5 @@ export function extractWorkflowType( /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions { if (obj == null) return false; - return obj.__temporal_is_workflow_function_with_options === true; + return Object.hasOwn(obj, 'workflowDefinitionOptions'); } diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts index e9d6b08d0..a2a965407 100644 --- a/packages/test/src/deployment-versioning-v1/index.ts +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -1,7 +1,7 @@ -import { setHandler, condition, defineWorkflowWithOptions, workflowInfo } from '@temporalio/workflow'; +import { setHandler, condition, setWorkflowOptions, workflowInfo } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; -defineWorkflowWithOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); +setWorkflowOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); export async function deploymentVersioning(): Promise { let doFinish = false; setHandler(unblockSignal, () => void (doFinish = true)); @@ -11,12 +11,12 @@ export async function deploymentVersioning(): Promise { } // Dynamic/default workflow handler -export default defineWorkflowWithOptions({ versioningBehavior: 'PINNED' }, _default); -async function _default(): Promise { +setWorkflowOptions({ versioningBehavior: 'PINNED' }, module.exports.default); +export default async function (): Promise { return 'dynamic'; } -defineWorkflowWithOptions(() => { +setWorkflowOptions(() => { // Need to ensure accessing workflow context still works in here workflowInfo(); return { diff --git a/packages/test/src/deployment-versioning-v2/index.ts b/packages/test/src/deployment-versioning-v2/index.ts index 87fd5863a..82e9885c9 100644 --- a/packages/test/src/deployment-versioning-v2/index.ts +++ b/packages/test/src/deployment-versioning-v2/index.ts @@ -1,7 +1,7 @@ -import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { setHandler, condition, setWorkflowOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; -defineWorkflowWithOptions({ versioningBehavior: 'PINNED' }, deploymentVersioning); +setWorkflowOptions({ versioningBehavior: 'PINNED' }, deploymentVersioning); export async function deploymentVersioning(): Promise { let doFinish = false; setHandler(unblockSignal, () => void (doFinish = true)); diff --git a/packages/test/src/deployment-versioning-v3/index.ts b/packages/test/src/deployment-versioning-v3/index.ts index b499779b2..a1e294b14 100644 --- a/packages/test/src/deployment-versioning-v3/index.ts +++ b/packages/test/src/deployment-versioning-v3/index.ts @@ -1,7 +1,7 @@ -import { setHandler, condition, defineWorkflowWithOptions } from '@temporalio/workflow'; +import { setHandler, condition, setWorkflowOptions } from '@temporalio/workflow'; import { unblockSignal, versionQuery } from '../workflows'; -defineWorkflowWithOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); +setWorkflowOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); export async function deploymentVersioning(): Promise { let doFinish = false; setHandler(unblockSignal, () => void (doFinish = true)); diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 743d1f821..5cb73758e 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -137,7 +137,7 @@ export function makeTestFunction(opts: { createTestContext: async (_t: ExecutionContext): Promise => { let env: TestWorkflowEnvironment; if (process.env.TEMPORAL_SERVICE_ADDRESS) { - env = await TestWorkflowEnvironment.createExistingServer({ + env = await TestWorkflowEnvironment.createFromExistingServer({ address: process.env.TEMPORAL_SERVICE_ADDRESS, }); } else { diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index d15bbea91..218be7169 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -215,10 +215,10 @@ export class TestWorkflowEnvironment extends RealTestWorkflowEnvironment { }); } - static async createExistingServer( + static async createFromExistingServer( opts?: ExistingServerTestWorkflowEnvironmentOptions ): Promise { - return RealTestWorkflowEnvironment.createExistingServer(opts); + return RealTestWorkflowEnvironment.createFromExistingServer(opts); } } diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index 2b7932508..5366c6f7b 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -33,12 +33,12 @@ export function convertToParentWorkflowType( export function convertDeploymentVersion( v: coresdk.common.IWorkerDeploymentVersion | null | undefined ): WorkerDeploymentVersion | undefined { - if (v == null) { + if (v == null || v.buildId == null) { return undefined; } return { - buildId: v.buildId ?? '', + buildId: v.buildId, deploymentName: v.deploymentName ?? '', }; } diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 25eaa8fb3..c80694c1f 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -79,24 +79,28 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { const workflowFn = mod[activator.info.workflowType]; const defaultWorkflowFn = mod['default']; - if (isWorkflowFunctionWithOptions(workflowFn)) { - activator.workflow = workflowFn; - if (typeof workflowFn.options === 'object') { - activator.versioningBehavior = workflowFn.options.versioningBehavior; + if (typeof workflowFn === 'function') { + if (isWorkflowFunctionWithOptions(workflowFn)) { + activator.workflow = workflowFn; + if (typeof workflowFn.workflowDefinitionOptions === 'object') { + activator.versioningBehavior = workflowFn.workflowDefinitionOptions.versioningBehavior; + } else { + activator.workflowDefinitionOptionsGetter = workflowFn.workflowDefinitionOptions; + } } else { - activator.workflowDefinitionOptionsGetter = workflowFn.options; + activator.workflow = workflowFn; } - } else if (isWorkflowFunctionWithOptions(defaultWorkflowFn)) { - activator.workflow = defaultWorkflowFn; - if (typeof defaultWorkflowFn.options === 'object') { - activator.versioningBehavior = defaultWorkflowFn.options.versioningBehavior; + } else if (typeof defaultWorkflowFn === 'function') { + if (isWorkflowFunctionWithOptions(defaultWorkflowFn)) { + activator.workflow = defaultWorkflowFn; + if (typeof defaultWorkflowFn.workflowDefinitionOptions === 'object') { + activator.versioningBehavior = defaultWorkflowFn.workflowDefinitionOptions.versioningBehavior; + } else { + activator.workflowDefinitionOptionsGetter = defaultWorkflowFn.workflowDefinitionOptions; + } } else { - activator.workflowDefinitionOptionsGetter = defaultWorkflowFn.options; + activator.workflow = defaultWorkflowFn; } - } else if (typeof workflowFn === 'function') { - activator.workflow = workflowFn; - } else if (typeof defaultWorkflowFn === 'function') { - activator.workflow = defaultWorkflowFn; } else { const details = workflowFn === undefined diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 499ed67f8..bd9810de0 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -1617,14 +1617,12 @@ export function allHandlersFinished(): boolean { } /** - * Can be used to alter or define workflow functions with certain options specified at definition - * time. In order to ensure that workflows are loaded properly by their name, you typically will not - * need to use the return value of this function. + * Can be used to alter workflow functions with certain options specified at definition time. * * @example * For example: * ```ts - * defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, myWorkflow); + * setWorkflowOptions({ versioningBehavior: 'PINNED' }, myWorkflow); * export async function myWorkflow(): Promise { * // Workflow code here * return "hi"; @@ -1634,29 +1632,25 @@ export function allHandlersFinished(): boolean { * @example * To annotate a default or dynamic workflow: * ```ts - * export default defineWorkflowWithOptions({ versioningBehavior: 'pinned' }, myDefaultWorkflow); - * async function myDefaultWorkflow(): Promise { + * export default async function (): Promise { * // Workflow code here * return "hi"; * } + * setWorkflowOptions({ versioningBehavior: 'PINNED' }, module.exports.default); * ``` * * @param options Options for the workflow defintion, or a function that returns options. If a * function is provided, it will be called once just before the workflow function is called for the - * first time. + * first time. It is safe to call {@link workflowInfo} inside such a function. * @param fn The workflow function. - * @returns The same passed in workflow function, with the specified options applied. You can export - * this function to make it available as a workflow function. */ -export function defineWorkflowWithOptions( +export function setWorkflowOptions( options: WorkflowDefinitionOptionsOrGetter, fn: (...args: A) => Promise -): WorkflowFunctionWithOptions { - const wrappedFn = Object.assign(fn, { - options, - __temporal_is_workflow_function_with_options: true as const, +) { + Object.assign(fn, { + workflowDefinitionOptions: options, }); - return wrappedFn; } export const stackTraceQuery = defineQuery('__stack_trace'); From 77b694c8e30c85d407bf6fcaf3e3e38f9900be7e Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 30 Apr 2025 10:49:15 -0700 Subject: [PATCH 13/14] Versioning override --- packages/client/src/workflow-options.ts | 40 ++++++++++++++++++++--- packages/common/src/worker-deployments.ts | 13 ++++++-- packages/workflow/src/workflow.ts | 3 +- 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/packages/client/src/workflow-options.ts b/packages/client/src/workflow-options.ts index 521fa6d37..cac4aa0d2 100644 --- a/packages/client/src/workflow-options.ts +++ b/packages/client/src/workflow-options.ts @@ -1,7 +1,14 @@ -import { CommonWorkflowOptions, SignalDefinition, WithWorkflowArgs, Workflow } from '@temporalio/common'; +import { + CommonWorkflowOptions, + SignalDefinition, + WithWorkflowArgs, + Workflow, + VersioningOverride, + toCanonicalString, +} from '@temporalio/common'; import { Duration, msOptionalToTs } from '@temporalio/common/lib/time'; import { Replace } from '@temporalio/common/lib/type-helpers'; -import { google } from '@temporalio/proto'; +import { google, temporal } from '@temporalio/proto'; export * from '@temporalio/common/lib/workflow-options'; @@ -38,9 +45,15 @@ export interface WorkflowOptions extends CommonWorkflowOptions { /** * Amount of time to wait before starting the workflow. - * */ startDelay?: Duration; + + /** + * Override the versioning behavior of the Workflow that is about to be started. + * + * @experimental Deployment based versioning is experimental and may change in the future. + */ + versioningOverride?: VersioningOverride; } export type WithCompiledWorkflowOptions = Replace< @@ -50,11 +63,13 @@ export type WithCompiledWorkflowOptions = Replace< workflowRunTimeout?: google.protobuf.IDuration; workflowTaskTimeout?: google.protobuf.IDuration; startDelay?: google.protobuf.IDuration; + versioningOverride?: temporal.api.workflow.v1.IVersioningOverride; } >; export function compileWorkflowOptions(options: T): WithCompiledWorkflowOptions { - const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, ...rest } = options; + const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, versioningOverride, ...rest } = + options; return { ...rest, @@ -62,6 +77,7 @@ export function compileWorkflowOptions(options: T): W workflowRunTimeout: msOptionalToTs(workflowRunTimeout), workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout), startDelay: msOptionalToTs(startDelay), + versioningOverride: versioningOverrideToProto(versioningOverride), }; } @@ -109,3 +125,19 @@ export interface WorkflowSignalWithStartOptionsWithArgs = WithWorkflowArgs; + +function versioningOverrideToProto( + vo: VersioningOverride | undefined +): temporal.api.workflow.v1.IVersioningOverride | undefined { + if (!vo) return undefined; + // TODO: Remove deprecated field assignments when versioning is non-experimental + if (vo === 'AUTO_UPGRADE') { + return { + behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + }; + } + return { + behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED, + pinnedVersion: toCanonicalString(vo.version), + }; +} diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts index 36e8bbd49..5b99287ad 100644 --- a/packages/common/src/worker-deployments.ts +++ b/packages/common/src/worker-deployments.ts @@ -22,8 +22,8 @@ export function toCanonicalString(version: WorkerDeploymentVersion): string { /** * Specifies when a workflow might move from a worker of one Build Id to another. * - * * 'pinned' - The workflow will be pinned to the current Build ID unless manually moved. - * * 'auto-upgrade' - The workflow will automatically move to the latest version (default Build ID + * * 'PINNED' - The workflow will be pinned to the current Build ID unless manually moved. + * * 'AUTO_UPGRADE' - The workflow will automatically move to the latest version (default Build ID * of the task queue) when the next task is dispatched. * * @experimental Deployment based versioning is experimental and may change in the future. @@ -48,3 +48,12 @@ export const [encodeVersioningBehavior, decodeVersioningBehavior] = makeProtoEnu } as const, 'VERSIONING_BEHAVIOR_' ); + +/** + * Represents versioning overrides. For example, when starting workflows. + * + * If set to 'AUTO_UPGRADE', the Workflow will run as if it is using {@link VersioningBehavior.AUTO_UPGRADE}. + * Otherwise, you select a pinned behavior, and a specific version to pin to. Currently only one + * such behavior exists, but more will be added in the future. + */ +export type VersioningOverride = { pinned_behavior: 'PINNED'; version: WorkerDeploymentVersion } | 'AUTO_UPGRADE'; diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index bd9810de0..d915d6ed7 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -24,7 +24,6 @@ import { SearchAttributeUpdatePair, compilePriority, WorkflowDefinitionOptionsOrGetter, - WorkflowFunctionWithOptions, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -1647,7 +1646,7 @@ export function allHandlersFinished(): boolean { export function setWorkflowOptions( options: WorkflowDefinitionOptionsOrGetter, fn: (...args: A) => Promise -) { +): void { Object.assign(fn, { workflowDefinitionOptions: options, }); From 3ee553e1e107cf1cf8e6d124305e73981dbbf756 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 8 May 2025 17:20:45 -0700 Subject: [PATCH 14/14] Deal with bridge refactor merge conflicts --- packages/core-bridge/bridge-macros/src/lib.rs | 3 + packages/core-bridge/src/worker.rs | 118 ++++++++++++++++-- packages/core-bridge/ts/native.ts | 34 ++++- packages/test/src/test-bridge.ts | 1 + packages/testing/src/index.ts | 1 + .../src/testing-workflow-environment.ts | 83 ++++++++---- packages/worker/src/worker-options.ts | 74 +++++++---- packages/worker/src/worker-tuner.ts | 34 +++-- packages/worker/src/worker.ts | 7 +- 9 files changed, 289 insertions(+), 66 deletions(-) diff --git a/packages/core-bridge/bridge-macros/src/lib.rs b/packages/core-bridge/bridge-macros/src/lib.rs index f3e31a682..6c338b346 100644 --- a/packages/core-bridge/bridge-macros/src/lib.rs +++ b/packages/core-bridge/bridge-macros/src/lib.rs @@ -9,6 +9,9 @@ use proc_macro::TokenStream; use syn::{DeriveInput, parse_macro_input}; /// Procedural macro for defining bridge types with compile-time field name conversion +/// +/// Note that enum types must all be defined on the JS side as objects with a `type` field which +/// is the kebab-case representation of the enum variant. #[proc_macro_derive(TryFromJs)] pub fn try_from_js(input: TokenStream) -> TokenStream { let input = parse_macro_input!(input as DeriveInput); diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 14938fa3d..7a418beca 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -32,7 +32,7 @@ use crate::{ runtime::{Runtime, RuntimeExt}, }; -pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> { +pub fn init(cx: &mut ModuleContext) -> NeonResult<()> { cx.export_function("newWorker", worker_new)?; cx.export_function("workerValidate", worker_validate)?; @@ -412,19 +412,27 @@ mod config { api::worker::{ ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, - WorkflowSlotKind, + WorkerDeploymentOptions as CoreWorkerDeploymentOptions, + WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind, }, + protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior, }; - use bridge_macros::TryFromJs; - use super::custom_slot_supplier::CustomSlotSupplierOptions; + use crate::helpers::TryIntoJs; + use bridge_macros::TryFromJs; + use neon::context::Context; + use neon::object::Object; + use neon::prelude::JsResult; + use neon::types::JsObject; + use temporal_sdk_core::api::worker::WorkerVersioningStrategy; #[derive(TryFromJs)] pub struct BridgeWorkerOptions { identity: String, build_id: String, use_versioning: bool, + worker_deployment_options: Option, task_queue: String, namespace: String, tuner: WorkerTuner, @@ -453,14 +461,44 @@ mod config { }, } + #[derive(TryFromJs)] + pub struct WorkerDeploymentOptions { + version: WorkerDeploymentVersion, + use_worker_versioning: bool, + default_versioning_behavior: VersioningBehavior, + } + + #[derive(TryFromJs)] + pub struct WorkerDeploymentVersion { + build_id: String, + deployment_name: String, + } + + #[derive(TryFromJs)] + pub enum VersioningBehavior { + Pinned, + AutoUpgrade, + } + impl BridgeWorkerOptions { pub(crate) fn into_core_config(self) -> Result { // Set all other options let mut builder = WorkerConfigBuilder::default(); builder .client_identity_override(Some(self.identity)) - .worker_build_id(self.build_id) - .use_worker_versioning(self.use_versioning) + .versioning_strategy({ + if let Some(dopts) = self.worker_deployment_options { + WorkerVersioningStrategy::WorkerDeploymentBased(dopts.into()) + } else if self.use_versioning { + WorkerVersioningStrategy::LegacyBuildIdBased { + build_id: self.build_id, + } + } else { + WorkerVersioningStrategy::None { + build_id: self.build_id, + } + } + }) .task_queue(self.task_queue) .namespace(self.namespace) .tuner(self.tuner.into_core_config()?) @@ -483,13 +521,13 @@ mod config { fn from(val: PollerBehavior) -> Self { match val { PollerBehavior::SimpleMaximum { maximum } => { - CorePollerBehavior::SimpleMaximum(maximum) + Self::SimpleMaximum(maximum) } PollerBehavior::Autoscaling { minimum, maximum, initial, - } => CorePollerBehavior::Autoscaling { + } => Self::Autoscaling { minimum, maximum, initial, @@ -498,6 +536,56 @@ mod config { } } + impl From for CoreWorkerDeploymentOptions { + fn from(val: WorkerDeploymentOptions) -> Self { + Self { + version: val.version.into(), + use_worker_versioning: val.use_worker_versioning, + default_versioning_behavior: Some(val.default_versioning_behavior.into()), + } + } + } + + impl From for CoreWorkerDeploymentVersion { + fn from(val: WorkerDeploymentVersion) -> Self { + Self { + build_id: val.build_id, + deployment_name: val.deployment_name, + } + } + } + + impl From for WorkerDeploymentVersion { + fn from(val: CoreWorkerDeploymentVersion) -> Self { + Self { + build_id: val.build_id, + deployment_name: val.deployment_name, + } + } + } + + impl TryIntoJs for WorkerDeploymentVersion { + type Output = JsObject; + + fn try_into_js<'cx>(self, cx: &mut impl Context<'cx>) -> JsResult<'cx, Self::Output> { + let obj = cx.empty_object(); + let bid = self.build_id.try_into_js(cx)?; + obj.set(cx, "buildId", bid)?; + let dn = self.deployment_name.try_into_js(cx)?; + obj.set(cx, "deploymentName", dn)?; + Ok(obj) + } + } + + impl From for CoreVersioningBehavior { + fn from(val: VersioningBehavior) -> Self { + match val { + VersioningBehavior::Pinned => Self::Pinned, + VersioningBehavior::AutoUpgrade => Self::AutoUpgrade, + } + } + } + #[derive(TryFromJs)] #[allow(clippy::struct_field_names)] pub(super) struct WorkerTuner { @@ -613,7 +701,7 @@ mod custom_slot_supplier { use tracing::warn; use crate::helpers::*; - + use crate::worker::config::WorkerDeploymentVersion; // Custom Slot Supplier //////////////////////////////////////////////////////////////////////////// pub(super) struct SlotSupplierBridge { @@ -639,7 +727,10 @@ mod custom_slot_supplier { slot_type: SK::kind().into(), task_queue: ctx.task_queue().to_string(), worker_identity: ctx.worker_identity().to_string(), - worker_build_id: ctx.worker_build_id().to_string(), + worker_deployment_version: ctx + .worker_deployment_version() + .clone() + .map(Into::into), is_sticky: ctx.is_sticky(), }; @@ -680,7 +771,10 @@ mod custom_slot_supplier { slot_type: SK::kind().into(), task_queue: ctx.task_queue().to_string(), worker_identity: ctx.worker_identity().to_string(), - worker_build_id: ctx.worker_build_id().to_string(), + worker_deployment_version: ctx + .worker_deployment_version() + .clone() + .map(Into::into), is_sticky: ctx.is_sticky(), }; @@ -809,7 +903,7 @@ mod custom_slot_supplier { slot_type: SlotKindType, task_queue: String, worker_identity: String, - worker_build_id: String, + worker_deployment_version: Option, is_sticky: bool, } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 539a8ce4d..90ec514f7 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -41,6 +41,7 @@ export type JsonString<_T> = string; //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newRuntime(telemOptions: RuntimeOptions): Runtime; + export declare function runtimeShutdown(runtime: Runtime): void; export interface Runtime { @@ -98,9 +99,13 @@ export interface OtelMetricsExporterOptions { //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newClient(runtime: Runtime, clientOptions: ClientOptions): Promise; + export declare function clientUpdateHeaders(client: Client, headers: Record): void; + export declare function clientUpdateApiKey(client: Client, apiKey: string): void; + export declare function clientSendRequest(client: Client, call: RpcCall): Promise; + export declare function clientClose(client: Client): void; export interface Client { @@ -155,16 +160,21 @@ export interface RpcCall { //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newWorker(client: Client, workerOptions: WorkerOptions): Worker; + export declare function workerValidate(worker: Worker): Promise; export declare function workerPollWorkflowActivation(worker: Worker): Promise; + export declare function workerCompleteWorkflowActivation(worker: Worker, result: Buffer): Promise; export declare function workerPollActivityTask(worker: Worker): Promise; + export declare function workerCompleteActivityTask(worker: Worker, result: Buffer): Promise; + export declare function workerRecordActivityHeartbeat(worker: Worker, heartbeat: Buffer): void; export declare function workerInitiateShutdown(worker: Worker): void; + export declare function workerFinalizeShutdown(worker: Worker): Promise; export interface Worker { @@ -175,6 +185,7 @@ export interface WorkerOptions { identity: string; buildId: string; useVersioning: boolean; + workerDeploymentOptions: Option; taskQueue: string; namespace: string; tuner: WorkerTunerOptions; @@ -203,6 +214,19 @@ export type PollerBehavior = initial: number; }; +export type WorkerDeploymentOptions = { + version: WorkerDeploymentVersion; + useWorkerVersioning: boolean; + defaultVersioningBehavior: VersioningBehavior; +}; + +export type WorkerDeploymentVersion = { + buildId: string; + deploymentName: string; +}; + +export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' }; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Worker Tuner //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -238,9 +262,13 @@ interface ResourceBasedTunerOptions { export interface CustomSlotSupplierOptions { type: 'custom'; + reserveSlot(ctx: SlotReserveContext, abortSignal: AbortSignal): Promise; + tryReserveSlot(ctx: SlotReserveContext): Option; + markSlotUsed(ctx: SlotMarkUsedContext): void; + releaseSlot(ctx: SlotReleaseContext): void; } @@ -268,7 +296,7 @@ export interface SlotReserveContext { slotType: SlotInfo['type']; taskQueue: string; workerIdentity: string; - workerBuildId: string; + workerDeploymentVersion: Option; isSticky: boolean; } @@ -290,7 +318,9 @@ export interface SlotPermit {} //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newReplayWorker(runtime: Runtime, workerOptions: WorkerOptions): [Worker, HistoryPusher]; + export declare function pushHistory(pusher: HistoryPusher, workflowId: string, history: Buffer): Promise; + export declare function closeHistoryStream(pusher: HistoryPusher): void; export interface HistoryPusher { @@ -302,7 +332,9 @@ export interface HistoryPusher { //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newEphemeralServer(runtime: Runtime, config: EphemeralServerConfig): Promise; + export declare function ephemeralServerGetTarget(server: EphemeralServer): string; + export declare function ephemeralServerShutdown(server: EphemeralServer): Promise; export interface EphemeralServer { diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index 6458f6d7c..04d930084 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -260,6 +260,7 @@ const GenericConfigs = { taskQueue: 'default', identity: 'test-worker', buildId: 'test-build-id', + workerDeploymentOptions: null, useVersioning: false, namespace: 'default', tuner: { diff --git a/packages/testing/src/index.ts b/packages/testing/src/index.ts index d00594ce4..8e8077e5c 100644 --- a/packages/testing/src/index.ts +++ b/packages/testing/src/index.ts @@ -14,6 +14,7 @@ export { TestWorkflowEnvironment, type LocalTestWorkflowEnvironmentOptions, type TimeSkippingTestWorkflowEnvironmentOptions, + type ExistingServerTestWorkflowEnvironmentOptions, } from './testing-workflow-environment'; export { diff --git a/packages/testing/src/testing-workflow-environment.ts b/packages/testing/src/testing-workflow-environment.ts index 9cfe4f91a..be78ea7ec 100644 --- a/packages/testing/src/testing-workflow-environment.ts +++ b/packages/testing/src/testing-workflow-environment.ts @@ -1,5 +1,5 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import -import { AsyncCompletionClient, Client, WorkflowClient } from '@temporalio/client'; +import { AsyncCompletionClient, Client, WorkflowClient, ClientOptions } from '@temporalio/client'; import { Duration, TypedSearchAttributes } from '@temporalio/common'; import { msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time'; import { NativeConnection, Runtime } from '@temporalio/worker'; @@ -25,6 +25,15 @@ export type TimeSkippingTestWorkflowEnvironmentOptions = { client?: ClientOptionsForTestEnv; }; +/** + * Options for {@link TestWorkflowEnvironment.createExistingServer} + */ +export type ExistingServerTestWorkflowEnvironmentOptions = { + /** If not set, defaults to localhost:7233 */ + address?: string; + client?: ClientOptions; +}; + /** * An execution environment for running Workflow integration tests. * @@ -72,7 +81,7 @@ export class TestWorkflowEnvironment { private readonly runtime: Runtime, public readonly options: TestWorkflowEnvironmentOptionsWithDefaults, public readonly supportsTimeSkipping: boolean, - protected readonly server: native.EphemeralServer, + protected readonly server: native.EphemeralServer | 'existing', connection: Connection, nativeConnection: NativeConnection, namespace: string | undefined @@ -107,9 +116,9 @@ export class TestWorkflowEnvironment { * environment, not to the workflow under test. We highly recommend running tests serially when using a single * environment or creating a separate environment per test. * - * By default, the latest release of the Test Serveer will be downloaded and cached to a temporary directory + * By default, the latest release of the Test Server will be downloaded and cached to a temporary directory * (e.g. `$TMPDIR/temporal-test-server-sdk-typescript-*` or `%TEMP%/temporal-test-server-sdk-typescript-*.exe`). Note - * that existing cached binairies will be reused without validation that they are still up-to-date, until the SDK + * that existing cached binaries will be reused without validation that they are still up-to-date, until the SDK * itself is updated. Alternatively, a specific version number of the Test Server may be provided, or the path to an * existing Test Server binary may be supplied; see {@link LocalTestWorkflowEnvironmentOptions.server.executable}. * @@ -141,7 +150,7 @@ export class TestWorkflowEnvironment { * * By default, the latest release of the CLI will be downloaded and cached to a temporary directory * (e.g. `$TMPDIR/temporal-sdk-typescript-*` or `%TEMP%/temporal-sdk-typescript-*.exe`). Note that existing cached - * binairies will be reused without validation that they are still up-to-date, until the SDK itself is updated. + * binaries will be reused without validation that they are still up-to-date, until the SDK itself is updated. * Alternatively, a specific version number of the CLI may be provided, or the path to an existing CLI binary may be * supplied; see {@link LocalTestWorkflowEnvironmentOptions.server.executable}. * @@ -158,6 +167,22 @@ export class TestWorkflowEnvironment { }); } + /** + * Create a new test environment using an existing server. You must already be running a server, which the test + * environment will connect to. + */ + static async createFromExistingServer( + opts?: ExistingServerTestWorkflowEnvironmentOptions + ): Promise { + return await this.create({ + server: { type: 'existing' }, + client: opts?.client, + namespace: opts?.client?.namespace ?? 'default', + supportsTimeSkipping: false, + address: opts?.address, + }); + } + /** * Create a new test environment */ @@ -165,25 +190,33 @@ export class TestWorkflowEnvironment { opts: TestWorkflowEnvironmentOptions & { supportsTimeSkipping: boolean; namespace?: string; + address?: string; } ): Promise { const { supportsTimeSkipping, namespace, ...rest } = opts; const optsWithDefaults = addDefaults(filterNullAndUndefined(rest)); - // Add search attributes to CLI server arguments - if ('searchAttributes' in optsWithDefaults.server && optsWithDefaults.server.searchAttributes) { - let newArgs: string[] = []; - for (const { name, type } of optsWithDefaults.server.searchAttributes) { - newArgs.push('--search-attribute'); - newArgs.push(`${name}=${TypedSearchAttributes.toMetadataType(type)}`); + let address: string; + const runtime = Runtime.instance(); + let server: native.EphemeralServer | 'existing'; + if (optsWithDefaults.server.type !== 'existing') { + // Add search attributes to CLI server arguments + if ('searchAttributes' in optsWithDefaults.server && optsWithDefaults.server.searchAttributes) { + let newArgs: string[] = []; + for (const { name, type } of optsWithDefaults.server.searchAttributes) { + newArgs.push('--search-attribute'); + newArgs.push(`${name}=${TypedSearchAttributes.toMetadataType(type)}`); + } + newArgs = newArgs.concat(optsWithDefaults.server.extraArgs ?? []); + optsWithDefaults.server.extraArgs = newArgs; } - newArgs = newArgs.concat(optsWithDefaults.server.extraArgs ?? []); - optsWithDefaults.server.extraArgs = newArgs; - } - const runtime = Runtime.instance(); - const server = await runtime.createEphemeralServer(toNativeEphemeralServerConfig(optsWithDefaults.server)); - const address = native.ephemeralServerGetTarget(server); + server = await runtime.createEphemeralServer(toNativeEphemeralServerConfig(optsWithDefaults.server)); + address = native.ephemeralServerGetTarget(server); + } else { + address = opts.address ?? 'localhost:7233'; + server = 'existing'; + } const nativeConnection = await NativeConnection.connect({ address }); const connection = await Connection.connect({ address }); @@ -203,16 +236,18 @@ export class TestWorkflowEnvironment { console.error(e); /* ignore */ }); - await this.runtime.shutdownEphemeralServer(this.server).catch((e) => { - console.error(e); - /* ignore */ - }); + if (this.server !== 'existing') { + await this.runtime.shutdownEphemeralServer(this.server).catch((e) => { + console.error(e); + /* ignore */ + }); + } } /** * Wait for `durationMs` in "server time". * - * This awaits using regular setTimeout in regular environments, or manually skips time in time-skipping environments. + * This awaits using regular setTimeout in regular environments or manually skips time in time-skipping environments. * * Useful for simulating events far into the future like completion of long running activities. * @@ -281,10 +316,12 @@ export class TestWorkflowEnvironment { * Options for {@link TestWorkflowEnvironment.create} */ type TestWorkflowEnvironmentOptions = { - server: DevServerConfig | TimeSkippingServerConfig; + server: DevServerConfig | TimeSkippingServerConfig | ExistingServerConfig; client?: ClientOptionsForTestEnv; }; +type ExistingServerConfig = { type: 'existing' }; + type TestWorkflowEnvironmentOptionsWithDefaults = Required; function addDefaults(opts: TestWorkflowEnvironmentOptions): TestWorkflowEnvironmentOptionsWithDefaults { diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 430980492..354afd188 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -67,7 +67,7 @@ export interface WorkerOptions { * @default `@temporalio/worker` package name and version + checksum of workflow bundle's code * * @experimental The Worker Versioning API is still being designed. Major changes are expected. - * @deprecated Use {@link deploymentVersion} instead. + * @deprecated Use {@link workerDeploymentOptions} instead. */ buildId?: string; @@ -79,7 +79,7 @@ export interface WorkerOptions { * For more information, see https://docs.temporal.io/workers#worker-versioning * * @experimental The Worker Versioning API is still being designed. Major changes are expected. - * @deprecated Use {@link deploymentVersion} instead. + * @deprecated Use {@link workerDeploymentOptions} instead. */ useVersioning?: boolean; @@ -88,23 +88,7 @@ export interface WorkerOptions { * * @experimental Deployment based versioning is still experimental. */ - workerDeploymentOptions?: { - /** - * The deployment version of the worker. - */ - version: WorkerDeploymentVersion; - - /** - * Whether to use deployment-based worker versioning. - */ - useWorkerVersioning: boolean; - - /** - * The default versioning behavior to use for all workflows on this worker. Specifying a default - * behavior is required, - */ - defaultVersioningBehavior: VersioningBehavior; - }; + workerDeploymentOptions?: WorkerDeploymentOptions; /** * The namespace this worker will connect to @@ -586,6 +570,30 @@ export interface PollerBehaviorSimpleMaximum { maximum?: number; } +/** + * Allows specifying the deployment version of the worker and whether to use deployment-based + * worker versioning. + * + * @experimental Deployment based versioning is still experimental. + */ +export type WorkerDeploymentOptions = { + /** + * The deployment version of the worker. + */ + version: WorkerDeploymentVersion; + + /** + * Whether to use deployment-based worker versioning. + */ + useWorkerVersioning: boolean; + + /** + * The default versioning behavior to use for all workflows on this worker. Specifying a default + * behavior is required. + */ + defaultVersioningBehavior: VersioningBehavior; +}; + // Replay Worker /////////////////////////////////////////////////////////////////////////////////// /** @@ -955,8 +963,9 @@ export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): Co export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { return { identity: opts.identity, - buildId: opts.buildId, - useVersioning: opts.useVersioning, + buildId: opts.buildId, // eslint-disable-line deprecation/deprecation + useVersioning: opts.useVersioning, // eslint-disable-line deprecation/deprecation + workerDeploymentOptions: toNativeDeploymentOptions(opts.workerDeploymentOptions), taskQueue: opts.taskQueue, namespace: opts.namespace, tuner: opts.tuner, @@ -993,6 +1002,29 @@ export function toNativeTaskPollerBehavior(behavior: Required): } } +function toNativeDeploymentOptions(options?: WorkerDeploymentOptions): native.WorkerDeploymentOptions | null { + if (options === undefined) { + return null; + } + let vb: native.VersioningBehavior; + switch (options.defaultVersioningBehavior) { + case 'PINNED': + vb = { type: 'pinned' }; + break; + case 'AUTO_UPGRADE': + vb = { type: 'auto-upgrade' }; + break; + default: + options.defaultVersioningBehavior satisfies never; + throw new Error(`Unknown versioning behavior: ${options.defaultVersioningBehavior}`); + } + return { + version: options.version, + useWorkerVersioning: options.useWorkerVersioning, + defaultVersioningBehavior: vb, + }; +} + // Utils /////////////////////////////////////////////////////////////////////////////////////////// function isSet(env: string | undefined): boolean { diff --git a/packages/worker/src/worker-tuner.ts b/packages/worker/src/worker-tuner.ts index 303ce3f60..72e95e934 100644 --- a/packages/worker/src/worker-tuner.ts +++ b/packages/worker/src/worker-tuner.ts @@ -1,6 +1,6 @@ import { native } from '@temporalio/core-bridge'; import { Duration, msToNumber } from '@temporalio/common/lib/time'; -import { Logger } from '@temporalio/common'; +import { Logger, WorkerDeploymentVersion } from '@temporalio/common'; /** * A worker tuner allows the customization of the performance characteristics of workers by @@ -210,6 +210,11 @@ export interface LocalActivitySlotInfo { // eslint-disable-next-line @typescript-eslint/no-empty-object-type export interface SlotPermit {} +/** + * Context for reserving a slot. + * + * @experimental Worker Tuner is an experimental feature and may be subject to change. + */ export interface SlotReserveContext { /** * The type of slot trying to be reserved @@ -231,6 +236,11 @@ export interface SlotReserveContext { */ workerBuildId: string; + /** + * The deployment version of the worker that is requesting the reservation + */ + workerDeploymentVersion?: WorkerDeploymentVersion; + /** * True iff this is a reservation for a sticky poll for a workflow task */ @@ -391,7 +401,7 @@ function addResourceBasedSlotDefaults( } } -class NativeifiedCustomSlotSupplier implements CustomSlotSupplier { +class NativeifiedCustomSlotSupplier implements native.CustomSlotSupplierOptions { readonly type = 'custom'; constructor( @@ -404,14 +414,18 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu this.releaseSlot = this.releaseSlot.bind(this); } - async reserveSlot(ctx: SlotReserveContext, abortSignal: AbortSignal): Promise { + async reserveSlot(ctx: native.SlotReserveContext, abortSignal: AbortSignal): Promise { + if (ctx.slotType === 'nexus') { + throw new Error('nexus not yet supported in slot suppliers'); + } try { const result = await this.supplier.reserveSlot( { slotType: ctx.slotType, taskQueue: ctx.taskQueue, workerIdentity: ctx.workerIdentity, - workerBuildId: ctx.workerBuildId, + workerBuildId: ctx.workerDeploymentVersion?.buildId ?? '', + workerDeploymentVersion: ctx.workerDeploymentVersion ?? undefined, isSticky: ctx.isSticky, }, abortSignal @@ -425,13 +439,17 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu } } - tryReserveSlot(ctx: SlotReserveContext): SlotPermit | null { + tryReserveSlot(ctx: native.SlotReserveContext): SlotPermit | null { + if (ctx.slotType === 'nexus') { + throw new Error('nexus not yet supported in slot suppliers'); + } try { const result = this.supplier.tryReserveSlot({ slotType: ctx.slotType, taskQueue: ctx.taskQueue, workerIdentity: ctx.workerIdentity, - workerBuildId: ctx.workerBuildId, + workerBuildId: ctx.workerDeploymentVersion?.buildId ?? '', + workerDeploymentVersion: ctx.workerDeploymentVersion ?? undefined, isSticky: ctx.isSticky, }); return result ?? null; @@ -441,7 +459,7 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu } } - markSlotUsed(ctx: SlotMarkUsedContext): void { + markSlotUsed(ctx: native.SlotMarkUsedContext): void { try { this.supplier.markSlotUsed({ slotInfo: ctx.slotInfo, @@ -452,7 +470,7 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu } } - releaseSlot(ctx: SlotReleaseContext): void { + releaseSlot(ctx: native.SlotReleaseContext): void { try { this.supplier.releaseSlot({ slotInfo: ctx.slotInfo ?? undefined, diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index d50b63490..d93a058cf 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -69,7 +69,12 @@ import { } from './replay'; import { History, Runtime } from './runtime'; import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; -import { byteArrayToBuffer, convertDeploymentVersion, convertToParentWorkflowType, convertToRootWorkflowType } from './utils'; +import { + byteArrayToBuffer, + convertDeploymentVersion, + convertToParentWorkflowType, + convertToRootWorkflowType, +} from './utils'; import { CompiledWorkerOptions, CompiledWorkerOptionsWithBuildId,