diff --git a/package-lock.json b/package-lock.json index 9b3910ae9..aa137056b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14997,10 +14997,11 @@ } }, "node_modules/protobufjs": { - "version": "7.2.5", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.2.5.tgz", - "integrity": "sha512-gGXRSXvxQ7UiPgfw8gevrfRWcTlSbOFg+p/N+JVJEK5VhueL2miT6qTymqAmjr1Q5WbOCyJbyrk6JfWKwlFn6A==", + "version": "7.5.1", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.5.1.tgz", + "integrity": "sha512-3qx3IRjR9WPQKagdwrKjO3Gu8RgQR2qqw+1KnigWhoVjFqegIj1K3bP11sGqhxrO46/XL7lekuG4jmjL+4cLsw==", "hasInstallScript": true, + "license": "BSD-3-Clause", "dependencies": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2", @@ -15020,9 +15021,10 @@ } }, "node_modules/protobufjs-cli": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/protobufjs-cli/-/protobufjs-cli-1.1.2.tgz", - "integrity": "sha512-8ivXWxT39gZN4mm4ArQyJrRgnIwZqffBWoLDsE21TmMcKI3XwJMV4lEF2WU02C4JAtgYYc2SfJIltelD8to35g==", + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/protobufjs-cli/-/protobufjs-cli-1.1.3.tgz", + "integrity": "sha512-MqD10lqF+FMsOayFiNOdOGNlXc4iKDCf0ZQPkPR+gizYh9gqUeGTWulABUCdI+N67w5RfJ6xhgX4J8pa8qmMXQ==", + "license": "BSD-3-Clause", "dependencies": { "chalk": "^4.0.0", "escodegen": "^1.13.0", @@ -19351,11 +19353,11 @@ "license": "MIT", "dependencies": { "long": "^5.2.3", - "protobufjs": "^7.2.5" + "protobufjs": "^7.5.1" }, "devDependencies": { "glob": "^10.3.10", - "protobufjs-cli": "^1.1.2" + "protobufjs-cli": "^1.1.3" }, "engines": { "node": ">= 18.0.0" @@ -19393,8 +19395,8 @@ "istanbul-lib-coverage": "^3.2.2", "long": "^5.2.3", "node-fetch": "^2.7.0", - "protobufjs": "^7.2.5", - "protobufjs-cli": "^1.1.2", + "protobufjs": "^7.5.1", + "protobufjs-cli": "^1.1.3", "rxjs": "7.8.1", "stack-utils": "^2.0.6", "uuid": "^9.0.1" @@ -21523,8 +21525,8 @@ "requires": { "glob": "^10.3.10", "long": "^5.2.3", - "protobufjs": "^7.2.5", - "protobufjs-cli": "^1.1.2" + "protobufjs": "^7.5.1", + "protobufjs-cli": "^1.1.3" } }, "@temporalio/test": { @@ -21567,8 +21569,8 @@ "node-fetch": "^2.7.0", "npm-run-all": "^4.1.5", "pidusage": "^3.0.2", - "protobufjs": "^7.2.5", - "protobufjs-cli": "^1.1.2", + "protobufjs": "^7.5.1", + "protobufjs-cli": "^1.1.3", "rxjs": "7.8.1", "stack-utils": "^2.0.6", "uuid": "^9.0.1" @@ -30593,9 +30595,9 @@ } }, "protobufjs": { - "version": "7.2.5", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.2.5.tgz", - "integrity": "sha512-gGXRSXvxQ7UiPgfw8gevrfRWcTlSbOFg+p/N+JVJEK5VhueL2miT6qTymqAmjr1Q5WbOCyJbyrk6JfWKwlFn6A==", + "version": "7.5.1", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.5.1.tgz", + "integrity": "sha512-3qx3IRjR9WPQKagdwrKjO3Gu8RgQR2qqw+1KnigWhoVjFqegIj1K3bP11sGqhxrO46/XL7lekuG4jmjL+4cLsw==", "requires": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2", @@ -30612,9 +30614,9 @@ } }, "protobufjs-cli": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/protobufjs-cli/-/protobufjs-cli-1.1.2.tgz", - "integrity": "sha512-8ivXWxT39gZN4mm4ArQyJrRgnIwZqffBWoLDsE21TmMcKI3XwJMV4lEF2WU02C4JAtgYYc2SfJIltelD8to35g==", + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/protobufjs-cli/-/protobufjs-cli-1.1.3.tgz", + "integrity": "sha512-MqD10lqF+FMsOayFiNOdOGNlXc4iKDCf0ZQPkPR+gizYh9gqUeGTWulABUCdI+N67w5RfJ6xhgX4J8pa8qmMXQ==", "requires": { "chalk": "^4.0.0", "escodegen": "^1.13.0", diff --git a/packages/client/src/workflow-options.ts b/packages/client/src/workflow-options.ts index 521fa6d37..cac4aa0d2 100644 --- a/packages/client/src/workflow-options.ts +++ b/packages/client/src/workflow-options.ts @@ -1,7 +1,14 @@ -import { CommonWorkflowOptions, SignalDefinition, WithWorkflowArgs, Workflow } from '@temporalio/common'; +import { + CommonWorkflowOptions, + SignalDefinition, + WithWorkflowArgs, + Workflow, + VersioningOverride, + toCanonicalString, +} from '@temporalio/common'; import { Duration, msOptionalToTs } from '@temporalio/common/lib/time'; import { Replace } from '@temporalio/common/lib/type-helpers'; -import { google } from '@temporalio/proto'; +import { google, temporal } from '@temporalio/proto'; export * from '@temporalio/common/lib/workflow-options'; @@ -38,9 +45,15 @@ export interface WorkflowOptions extends CommonWorkflowOptions { /** * Amount of time to wait before starting the workflow. - * */ startDelay?: Duration; + + /** + * Override the versioning behavior of the Workflow that is about to be started. + * + * @experimental Deployment based versioning is experimental and may change in the future. + */ + versioningOverride?: VersioningOverride; } export type WithCompiledWorkflowOptions = Replace< @@ -50,11 +63,13 @@ export type WithCompiledWorkflowOptions = Replace< workflowRunTimeout?: google.protobuf.IDuration; workflowTaskTimeout?: google.protobuf.IDuration; startDelay?: google.protobuf.IDuration; + versioningOverride?: temporal.api.workflow.v1.IVersioningOverride; } >; export function compileWorkflowOptions(options: T): WithCompiledWorkflowOptions { - const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, ...rest } = options; + const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, versioningOverride, ...rest } = + options; return { ...rest, @@ -62,6 +77,7 @@ export function compileWorkflowOptions(options: T): W workflowRunTimeout: msOptionalToTs(workflowRunTimeout), workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout), startDelay: msOptionalToTs(startDelay), + versioningOverride: versioningOverrideToProto(versioningOverride), }; } @@ -109,3 +125,19 @@ export interface WorkflowSignalWithStartOptionsWithArgs = WithWorkflowArgs; + +function versioningOverrideToProto( + vo: VersioningOverride | undefined +): temporal.api.workflow.v1.IVersioningOverride | undefined { + if (!vo) return undefined; + // TODO: Remove deprecated field assignments when versioning is non-experimental + if (vo === 'AUTO_UPGRADE') { + return { + behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + }; + } + return { + behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED, + pinnedVersion: toCanonicalString(vo.version), + }; +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 1c45a5a90..7aecd6dc7 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -22,6 +22,8 @@ export * from './logger'; export * from './priority'; export * from './retry-policy'; export type { Timestamp, Duration, StringValue } from './time'; +export * from './worker-deployments'; +export * from './workflow-definition-options'; export * from './workflow-handle'; export * from './workflow-options'; export * from './versioning-intent'; diff --git a/packages/common/src/worker-deployments.ts b/packages/common/src/worker-deployments.ts new file mode 100644 index 000000000..5b99287ad --- /dev/null +++ b/packages/common/src/worker-deployments.ts @@ -0,0 +1,59 @@ +import type { temporal } from '@temporalio/proto'; +import { makeProtoEnumConverters } from './internal-workflow'; + +/** + * Represents the version of a specific worker deployment. + * + * @experimental Deployment based versioning is experimental and may change in the future. + */ +export interface WorkerDeploymentVersion { + readonly buildId: string; + readonly deploymentName: string; +} + +/** + * @returns The canonical representation of a deployment version, which is a string in the format + * `deploymentName.buildId`. + */ +export function toCanonicalString(version: WorkerDeploymentVersion): string { + return `${version.deploymentName}.${version.buildId}`; +} + +/** + * Specifies when a workflow might move from a worker of one Build Id to another. + * + * * 'PINNED' - The workflow will be pinned to the current Build ID unless manually moved. + * * 'AUTO_UPGRADE' - The workflow will automatically move to the latest version (default Build ID + * of the task queue) when the next task is dispatched. + * + * @experimental Deployment based versioning is experimental and may change in the future. + */ +export const VersioningBehavior = { + PINNED: 'PINNED', + AUTO_UPGRADE: 'AUTO_UPGRADE', +} as const; +export type VersioningBehavior = (typeof VersioningBehavior)[keyof typeof VersioningBehavior]; + +export const [encodeVersioningBehavior, decodeVersioningBehavior] = makeProtoEnumConverters< + temporal.api.enums.v1.VersioningBehavior, + typeof temporal.api.enums.v1.VersioningBehavior, + keyof typeof temporal.api.enums.v1.VersioningBehavior, + typeof VersioningBehavior, + 'VERSIONING_BEHAVIOR_' +>( + { + [VersioningBehavior.PINNED]: 1, + [VersioningBehavior.AUTO_UPGRADE]: 2, + UNSPECIFIED: 0, + } as const, + 'VERSIONING_BEHAVIOR_' +); + +/** + * Represents versioning overrides. For example, when starting workflows. + * + * If set to 'AUTO_UPGRADE', the Workflow will run as if it is using {@link VersioningBehavior.AUTO_UPGRADE}. + * Otherwise, you select a pinned behavior, and a specific version to pin to. Currently only one + * such behavior exists, but more will be added in the future. + */ +export type VersioningOverride = { pinned_behavior: 'PINNED'; version: WorkerDeploymentVersion } | 'AUTO_UPGRADE'; diff --git a/packages/common/src/workflow-definition-options.ts b/packages/common/src/workflow-definition-options.ts new file mode 100644 index 000000000..fe7aeac0e --- /dev/null +++ b/packages/common/src/workflow-definition-options.ts @@ -0,0 +1,18 @@ +import { VersioningBehavior } from './worker-deployments'; + +/** + * Options that can be used when defining a workflow via {@link setWorkflowOptions}. + */ +export interface WorkflowDefinitionOptions { + versioningBehavior?: VersioningBehavior; +} + +type AsyncFunction = (...args: Args) => Promise; +export type WorkflowDefinitionOptionsOrGetter = WorkflowDefinitionOptions | (() => WorkflowDefinitionOptions); + +/** + * A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}. + */ +export interface WorkflowFunctionWithOptions extends AsyncFunction { + workflowDefinitionOptions: WorkflowDefinitionOptionsOrGetter; +} diff --git a/packages/common/src/workflow-options.ts b/packages/common/src/workflow-options.ts index aeb3aa349..51e63f8bd 100644 --- a/packages/common/src/workflow-options.ts +++ b/packages/common/src/workflow-options.ts @@ -5,6 +5,7 @@ import { Duration } from './time'; import { makeProtoEnumConverters } from './internal-workflow'; import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes'; import { Priority } from './priority'; +import { WorkflowFunctionWithOptions } from './workflow-definition-options'; /** * Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow. @@ -243,7 +244,9 @@ export interface WorkflowDurationOptions { export type CommonWorkflowOptions = BaseWorkflowOptions & WorkflowDurationOptions; -export function extractWorkflowType(workflowTypeOrFunc: string | T): string { +export function extractWorkflowType( + workflowTypeOrFunc: string | T | WorkflowFunctionWithOptions +): string { if (typeof workflowTypeOrFunc === 'string') return workflowTypeOrFunc as string; if (typeof workflowTypeOrFunc === 'function') { if (workflowTypeOrFunc?.name) return workflowTypeOrFunc.name; @@ -253,3 +256,9 @@ export function extractWorkflowType(workflowTypeOrFunc: stri `Invalid workflow type: expected either a string or a function, got '${typeof workflowTypeOrFunc}'` ); } + +/* eslint-disable @typescript-eslint/explicit-module-boundary-types */ +export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions { + if (obj == null) return false; + return Object.hasOwn(obj, 'workflowDefinitionOptions'); +} diff --git a/packages/core-bridge/Cargo.lock b/packages/core-bridge/Cargo.lock index fbd60238f..ef5f212f3 100644 --- a/packages/core-bridge/Cargo.lock +++ b/packages/core-bridge/Cargo.lock @@ -169,9 +169,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" dependencies = [ "addr2line", "cfg-if", @@ -252,9 +252,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.21" +version = "1.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8691782945451c1c383942c4874dbe63814f61cb57ef773cda2972682b7bb3c0" +checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1" dependencies = [ "jobserver", "libc", @@ -335,9 +335,9 @@ dependencies = [ [[package]] name = "crc" -version = "3.2.1" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" dependencies = [ "crc-catalog", ] @@ -802,9 +802,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" dependencies = [ "cfg-if", "js-sys", @@ -837,7 +837,7 @@ dependencies = [ "futures-sink", "futures-timer", "futures-util", - "getrandom 0.3.2", + "getrandom 0.3.3", "no-std-compat", "nonzero_ext", "parking_lot", @@ -851,9 +851,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633" +checksum = "a9421a676d1b147b16b82c9225157dc629087ef8ec4d5e2960f9437a90dac0a5" dependencies = [ "atomic-waker", "bytes", @@ -1026,21 +1026,22 @@ dependencies = [ [[package]] name = "icu_collections" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" dependencies = [ "displaydoc", + "potential_utf", "yoke", "zerofrom", "zerovec", ] [[package]] -name = "icu_locid" -version = "1.5.0" +name = "icu_locale_core" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" dependencies = [ "displaydoc", "litemap", @@ -1049,31 +1050,11 @@ dependencies = [ "zerovec", ] -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d" - [[package]] name = "icu_normalizer" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" dependencies = [ "displaydoc", "icu_collections", @@ -1081,67 +1062,54 @@ dependencies = [ "icu_properties", "icu_provider", "smallvec", - "utf16_iter", - "utf8_iter", - "write16", "zerovec", ] [[package]] name = "icu_normalizer_data" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" [[package]] name = "icu_properties" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a" dependencies = [ "displaydoc", "icu_collections", - "icu_locid_transform", + "icu_locale_core", "icu_properties_data", "icu_provider", - "tinystr", + "potential_utf", + "zerotrie", "zerovec", ] [[package]] name = "icu_properties_data" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2" +checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04" [[package]] name = "icu_provider" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" dependencies = [ "displaydoc", - "icu_locid", - "icu_provider_macros", + "icu_locale_core", "stable_deref_trait", "tinystr", "writeable", "yoke", "zerofrom", + "zerotrie", "zerovec", ] -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "ident_case" version = "1.0.1" @@ -1161,9 +1129,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" dependencies = [ "icu_normalizer", "icu_properties", @@ -1243,7 +1211,7 @@ version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" dependencies = [ - "getrandom 0.3.2", + "getrandom 0.3.3", "libc", ] @@ -1271,12 +1239,12 @@ checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libloading" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +checksum = "6a793df0d7afeac54f95b471d3af7f0d4fb975699f972341a4b76988d49cdf0c" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.53.0", ] [[package]] @@ -1298,9 +1266,9 @@ checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" [[package]] name = "litemap" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" [[package]] name = "lock_api" @@ -1327,6 +1295,12 @@ dependencies = [ "hashbrown 0.15.3", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "lzma-rs" version = "0.3.0" @@ -1423,9 +1397,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" [[package]] name = "neon" @@ -1534,58 +1508,74 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "opentelemetry" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.12", + "tracing", +] + [[package]] name = "opentelemetry-http" -version = "0.26.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" dependencies = [ "async-trait", "bytes", "http", - "opentelemetry", + "opentelemetry 0.29.1", "reqwest", + "tracing", ] [[package]] name = "opentelemetry-otlp" -version = "0.26.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" dependencies = [ - "async-trait", "futures-core", "http", - "opentelemetry", + "opentelemetry 0.29.1", "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", "prost", "reqwest", - "thiserror 1.0.69", + "thiserror 2.0.12", "tokio", "tonic", + "tracing", ] [[package]] name = "opentelemetry-prometheus" -version = "0.17.0" -source = "git+https://github.com/open-telemetry/opentelemetry-rust.git?rev=e911383#e91138351a689cd21923c15eb48f5fbc95ded807" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "098a71a4430bb712be6130ed777335d2e5b19bc8566de5f2edddfce906def6ab" dependencies = [ "once_cell", - "opentelemetry", + "opentelemetry 0.29.1", "opentelemetry_sdk", "prometheus", - "protobuf", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.26.1" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" dependencies = [ - "opentelemetry", + "opentelemetry 0.29.1", "opentelemetry_sdk", "prost", "tonic", @@ -1593,23 +1583,22 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.26.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" dependencies = [ - "async-trait", "futures-channel", "futures-executor", "futures-util", "glob", - "once_cell", - "opentelemetry", + "opentelemetry 0.29.1", "percent-encoding", - "rand 0.8.5", + "rand 0.9.1", "serde_json", - "thiserror 1.0.69", + "thiserror 2.0.12", "tokio", "tokio-stream", + "tracing", ] [[package]] @@ -1739,6 +1728,15 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -1801,9 +1799,9 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.13.4" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" dependencies = [ "cfg-if", "fnv", @@ -1811,7 +1809,7 @@ dependencies = [ "memchr", "parking_lot", "protobuf", - "thiserror 1.0.69", + "thiserror 2.0.12", ] [[package]] @@ -1914,9 +1912,23 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.28.0" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] [[package]] name = "quanta" @@ -1935,9 +1947,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.7" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3bd15a6f2967aef83887dcb9fec0014580467e33720d073560cf015a5683012" +checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" dependencies = [ "bytes", "cfg_aliases", @@ -1955,12 +1967,13 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.11" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcbafbbdbb0f638fe3f35f3c56739f77a8a1d070cb25603226c83339b391472b" +checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ "bytes", - "getrandom 0.3.2", + "getrandom 0.3.3", + "lru-slab", "rand 0.9.1", "ring", "rustc-hash", @@ -2058,7 +2071,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom 0.3.2", + "getrandom 0.3.3", ] [[package]] @@ -2072,9 +2085,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.11" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3" +checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ "bitflags", ] @@ -2244,9 +2257,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.26" +version = "0.23.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" +checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" dependencies = [ "log", "once_cell", @@ -2280,18 +2293,19 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" dependencies = [ "web-time", + "zeroize", ] [[package]] name = "rustls-webpki" -version = "0.103.1" +version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ "ring", "rustls-pki-types", @@ -2580,12 +2594,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.19.1" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ "fastrand", - "getrandom 0.3.2", + "getrandom 0.3.3", "once_cell", "rustix", "windows-sys 0.59.0", @@ -2644,7 +2658,7 @@ dependencies = [ "itertools", "lru", "mockall", - "opentelemetry", + "opentelemetry 0.29.1", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry_sdk", @@ -2686,7 +2700,7 @@ dependencies = [ "async-trait", "derive_builder", "derive_more", - "opentelemetry", + "opentelemetry 0.29.1", "prost", "serde_json", "temporal-sdk-core-protos", @@ -2726,7 +2740,7 @@ dependencies = [ "bridge-macros", "futures", "neon", - "opentelemetry", + "opentelemetry 0.26.0", "os_pipe", "parking_lot", "prost", @@ -2820,9 +2834,9 @@ checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" [[package]] name = "tinystr" -version = "0.7.6" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" dependencies = [ "displaydoc", "zerovec", @@ -2845,9 +2859,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.2" +version = "1.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" dependencies = [ "backtrace", "bytes", @@ -3126,12 +3140,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - [[package]] name = "utf8_iter" version = "1.0.4" @@ -3144,7 +3152,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" dependencies = [ - "getrandom 0.3.2", + "getrandom 0.3.3", ] [[package]] @@ -3552,17 +3560,11 @@ dependencies = [ "bitflags", ] -[[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - [[package]] name = "writeable" -version = "0.5.5" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" [[package]] name = "xattr" @@ -3585,9 +3587,9 @@ dependencies = [ [[package]] name = "yoke" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" dependencies = [ "serde", "stable_deref_trait", @@ -3597,9 +3599,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", @@ -3668,11 +3670,22 @@ dependencies = [ "syn", ] +[[package]] +name = "zerotrie" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + [[package]] name = "zerovec" -version = "0.10.4" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" dependencies = [ "yoke", "zerofrom", @@ -3681,9 +3694,9 @@ dependencies = [ [[package]] name = "zerovec-derive" -version = "0.10.3" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", @@ -3704,7 +3717,7 @@ dependencies = [ "crossbeam-utils", "deflate64", "flate2", - "getrandom 0.3.2", + "getrandom 0.3.3", "hmac", "indexmap 2.9.0", "lzma-rs", diff --git a/packages/core-bridge/bridge-macros/src/lib.rs b/packages/core-bridge/bridge-macros/src/lib.rs index f3e31a682..6c338b346 100644 --- a/packages/core-bridge/bridge-macros/src/lib.rs +++ b/packages/core-bridge/bridge-macros/src/lib.rs @@ -9,6 +9,9 @@ use proc_macro::TokenStream; use syn::{DeriveInput, parse_macro_input}; /// Procedural macro for defining bridge types with compile-time field name conversion +/// +/// Note that enum types must all be defined on the JS side as objects with a `type` field which +/// is the kebab-case representation of the enum variant. #[proc_macro_derive(TryFromJs)] pub fn try_from_js(input: TokenStream) -> TokenStream { let input = parse_macro_input!(input as DeriveInput); diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 93471ac6d..87f2bffee 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 93471ac6d8bbf62839148a1bf97ef6dfd49aead0 +Subproject commit 87f2bffeed8677385a28a6d2a90f8d9ff8ae2246 diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 14938fa3d..7a418beca 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -32,7 +32,7 @@ use crate::{ runtime::{Runtime, RuntimeExt}, }; -pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> { +pub fn init(cx: &mut ModuleContext) -> NeonResult<()> { cx.export_function("newWorker", worker_new)?; cx.export_function("workerValidate", worker_validate)?; @@ -412,19 +412,27 @@ mod config { api::worker::{ ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError, - WorkflowSlotKind, + WorkerDeploymentOptions as CoreWorkerDeploymentOptions, + WorkerDeploymentVersion as CoreWorkerDeploymentVersion, WorkflowSlotKind, }, + protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior, }; - use bridge_macros::TryFromJs; - use super::custom_slot_supplier::CustomSlotSupplierOptions; + use crate::helpers::TryIntoJs; + use bridge_macros::TryFromJs; + use neon::context::Context; + use neon::object::Object; + use neon::prelude::JsResult; + use neon::types::JsObject; + use temporal_sdk_core::api::worker::WorkerVersioningStrategy; #[derive(TryFromJs)] pub struct BridgeWorkerOptions { identity: String, build_id: String, use_versioning: bool, + worker_deployment_options: Option, task_queue: String, namespace: String, tuner: WorkerTuner, @@ -453,14 +461,44 @@ mod config { }, } + #[derive(TryFromJs)] + pub struct WorkerDeploymentOptions { + version: WorkerDeploymentVersion, + use_worker_versioning: bool, + default_versioning_behavior: VersioningBehavior, + } + + #[derive(TryFromJs)] + pub struct WorkerDeploymentVersion { + build_id: String, + deployment_name: String, + } + + #[derive(TryFromJs)] + pub enum VersioningBehavior { + Pinned, + AutoUpgrade, + } + impl BridgeWorkerOptions { pub(crate) fn into_core_config(self) -> Result { // Set all other options let mut builder = WorkerConfigBuilder::default(); builder .client_identity_override(Some(self.identity)) - .worker_build_id(self.build_id) - .use_worker_versioning(self.use_versioning) + .versioning_strategy({ + if let Some(dopts) = self.worker_deployment_options { + WorkerVersioningStrategy::WorkerDeploymentBased(dopts.into()) + } else if self.use_versioning { + WorkerVersioningStrategy::LegacyBuildIdBased { + build_id: self.build_id, + } + } else { + WorkerVersioningStrategy::None { + build_id: self.build_id, + } + } + }) .task_queue(self.task_queue) .namespace(self.namespace) .tuner(self.tuner.into_core_config()?) @@ -483,13 +521,13 @@ mod config { fn from(val: PollerBehavior) -> Self { match val { PollerBehavior::SimpleMaximum { maximum } => { - CorePollerBehavior::SimpleMaximum(maximum) + Self::SimpleMaximum(maximum) } PollerBehavior::Autoscaling { minimum, maximum, initial, - } => CorePollerBehavior::Autoscaling { + } => Self::Autoscaling { minimum, maximum, initial, @@ -498,6 +536,56 @@ mod config { } } + impl From for CoreWorkerDeploymentOptions { + fn from(val: WorkerDeploymentOptions) -> Self { + Self { + version: val.version.into(), + use_worker_versioning: val.use_worker_versioning, + default_versioning_behavior: Some(val.default_versioning_behavior.into()), + } + } + } + + impl From for CoreWorkerDeploymentVersion { + fn from(val: WorkerDeploymentVersion) -> Self { + Self { + build_id: val.build_id, + deployment_name: val.deployment_name, + } + } + } + + impl From for WorkerDeploymentVersion { + fn from(val: CoreWorkerDeploymentVersion) -> Self { + Self { + build_id: val.build_id, + deployment_name: val.deployment_name, + } + } + } + + impl TryIntoJs for WorkerDeploymentVersion { + type Output = JsObject; + + fn try_into_js<'cx>(self, cx: &mut impl Context<'cx>) -> JsResult<'cx, Self::Output> { + let obj = cx.empty_object(); + let bid = self.build_id.try_into_js(cx)?; + obj.set(cx, "buildId", bid)?; + let dn = self.deployment_name.try_into_js(cx)?; + obj.set(cx, "deploymentName", dn)?; + Ok(obj) + } + } + + impl From for CoreVersioningBehavior { + fn from(val: VersioningBehavior) -> Self { + match val { + VersioningBehavior::Pinned => Self::Pinned, + VersioningBehavior::AutoUpgrade => Self::AutoUpgrade, + } + } + } + #[derive(TryFromJs)] #[allow(clippy::struct_field_names)] pub(super) struct WorkerTuner { @@ -613,7 +701,7 @@ mod custom_slot_supplier { use tracing::warn; use crate::helpers::*; - + use crate::worker::config::WorkerDeploymentVersion; // Custom Slot Supplier //////////////////////////////////////////////////////////////////////////// pub(super) struct SlotSupplierBridge { @@ -639,7 +727,10 @@ mod custom_slot_supplier { slot_type: SK::kind().into(), task_queue: ctx.task_queue().to_string(), worker_identity: ctx.worker_identity().to_string(), - worker_build_id: ctx.worker_build_id().to_string(), + worker_deployment_version: ctx + .worker_deployment_version() + .clone() + .map(Into::into), is_sticky: ctx.is_sticky(), }; @@ -680,7 +771,10 @@ mod custom_slot_supplier { slot_type: SK::kind().into(), task_queue: ctx.task_queue().to_string(), worker_identity: ctx.worker_identity().to_string(), - worker_build_id: ctx.worker_build_id().to_string(), + worker_deployment_version: ctx + .worker_deployment_version() + .clone() + .map(Into::into), is_sticky: ctx.is_sticky(), }; @@ -809,7 +903,7 @@ mod custom_slot_supplier { slot_type: SlotKindType, task_queue: String, worker_identity: String, - worker_build_id: String, + worker_deployment_version: Option, is_sticky: bool, } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 539a8ce4d..90ec514f7 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -41,6 +41,7 @@ export type JsonString<_T> = string; //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newRuntime(telemOptions: RuntimeOptions): Runtime; + export declare function runtimeShutdown(runtime: Runtime): void; export interface Runtime { @@ -98,9 +99,13 @@ export interface OtelMetricsExporterOptions { //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newClient(runtime: Runtime, clientOptions: ClientOptions): Promise; + export declare function clientUpdateHeaders(client: Client, headers: Record): void; + export declare function clientUpdateApiKey(client: Client, apiKey: string): void; + export declare function clientSendRequest(client: Client, call: RpcCall): Promise; + export declare function clientClose(client: Client): void; export interface Client { @@ -155,16 +160,21 @@ export interface RpcCall { //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newWorker(client: Client, workerOptions: WorkerOptions): Worker; + export declare function workerValidate(worker: Worker): Promise; export declare function workerPollWorkflowActivation(worker: Worker): Promise; + export declare function workerCompleteWorkflowActivation(worker: Worker, result: Buffer): Promise; export declare function workerPollActivityTask(worker: Worker): Promise; + export declare function workerCompleteActivityTask(worker: Worker, result: Buffer): Promise; + export declare function workerRecordActivityHeartbeat(worker: Worker, heartbeat: Buffer): void; export declare function workerInitiateShutdown(worker: Worker): void; + export declare function workerFinalizeShutdown(worker: Worker): Promise; export interface Worker { @@ -175,6 +185,7 @@ export interface WorkerOptions { identity: string; buildId: string; useVersioning: boolean; + workerDeploymentOptions: Option; taskQueue: string; namespace: string; tuner: WorkerTunerOptions; @@ -203,6 +214,19 @@ export type PollerBehavior = initial: number; }; +export type WorkerDeploymentOptions = { + version: WorkerDeploymentVersion; + useWorkerVersioning: boolean; + defaultVersioningBehavior: VersioningBehavior; +}; + +export type WorkerDeploymentVersion = { + buildId: string; + deploymentName: string; +}; + +export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' }; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Worker Tuner //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -238,9 +262,13 @@ interface ResourceBasedTunerOptions { export interface CustomSlotSupplierOptions { type: 'custom'; + reserveSlot(ctx: SlotReserveContext, abortSignal: AbortSignal): Promise; + tryReserveSlot(ctx: SlotReserveContext): Option; + markSlotUsed(ctx: SlotMarkUsedContext): void; + releaseSlot(ctx: SlotReleaseContext): void; } @@ -268,7 +296,7 @@ export interface SlotReserveContext { slotType: SlotInfo['type']; taskQueue: string; workerIdentity: string; - workerBuildId: string; + workerDeploymentVersion: Option; isSticky: boolean; } @@ -290,7 +318,9 @@ export interface SlotPermit {} //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newReplayWorker(runtime: Runtime, workerOptions: WorkerOptions): [Worker, HistoryPusher]; + export declare function pushHistory(pusher: HistoryPusher, workflowId: string, history: Buffer): Promise; + export declare function closeHistoryStream(pusher: HistoryPusher): void; export interface HistoryPusher { @@ -302,7 +332,9 @@ export interface HistoryPusher { //////////////////////////////////////////////////////////////////////////////////////////////////// export declare function newEphemeralServer(runtime: Runtime, config: EphemeralServerConfig): Promise; + export declare function ephemeralServerGetTarget(server: EphemeralServer): string; + export declare function ephemeralServerShutdown(server: EphemeralServer): Promise; export interface EphemeralServer { diff --git a/packages/proto/package.json b/packages/proto/package.json index 4fdc06036..42672d061 100644 --- a/packages/proto/package.json +++ b/packages/proto/package.json @@ -21,11 +21,11 @@ "license": "MIT", "dependencies": { "long": "^5.2.3", - "protobufjs": "^7.2.5" + "protobufjs": "^7.5.1" }, "devDependencies": { "glob": "^10.3.10", - "protobufjs-cli": "^1.1.2" + "protobufjs-cli": "^1.1.3" }, "bugs": { "url": "https://github.com/temporalio/sdk-typescript/issues" diff --git a/packages/proto/scripts/compile-proto.js b/packages/proto/scripts/compile-proto.js index 8a5bbd13c..5c11a70ae 100644 --- a/packages/proto/scripts/compile-proto.js +++ b/packages/proto/scripts/compile-proto.js @@ -9,24 +9,8 @@ const pbts = require('protobufjs-cli/pbts'); const outputDir = resolve(__dirname, '../protos'); const jsOutputFile = resolve(outputDir, 'json-module.js'); const tempFile = resolve(outputDir, 'temp.js'); -const protoBaseDir = resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos'); -const coreProtoPath = resolve(protoBaseDir, 'local/temporal/sdk/core/core_interface.proto'); -const workflowServiceProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/workflowservice/v1/service.proto'); -const operatorServiceProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/operatorservice/v1/service.proto'); -const cloudServiceProtoPath = resolve( - protoBaseDir, - 'api_cloud_upstream/temporal/api/cloud/cloudservice/v1/service.proto' -); -const errorDetailsProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/errordetails/v1/message.proto'); -const workflowMetadataProtoPath = resolve(protoBaseDir, 'api_upstream/temporal/api/sdk/v1/workflow_metadata.proto'); -const testServiceRRProtoPath = resolve( - protoBaseDir, - 'testsrv_upstream/temporal/api/testservice/v1/request_response.proto' -); -const testServiceProtoPath = resolve(protoBaseDir, 'testsrv_upstream/temporal/api/testservice/v1/service.proto'); -const healthServiceProtoPath = resolve(protoBaseDir, 'grpc/health/v1/health.proto'); -const googleRpcStatusProtoPath = resolve(protoBaseDir, 'google/rpc/status.proto'); +const protoBaseDir = resolve(__dirname, '../../core-bridge/sdk-core/sdk-core-protos/protos'); function mtime(path) { try { @@ -40,28 +24,15 @@ function mtime(path) { } async function compileProtos(dtsOutputFile, ...args) { - // Use --root to avoid conflicting with user's root - // and to avoid this error: https://github.com/protobufjs/protobuf.js/issues/1114 const pbjsArgs = [ - ...args, - '--wrap', - 'commonjs', + ...['--wrap', 'commonjs'], '--force-long', '--no-verify', '--alt-comment', - '--root', - '__temporal', - resolve(require.resolve('protobufjs'), '../google/protobuf/descriptor.proto'), - coreProtoPath, - workflowServiceProtoPath, - operatorServiceProtoPath, - cloudServiceProtoPath, - errorDetailsProtoPath, - workflowMetadataProtoPath, - testServiceRRProtoPath, - testServiceProtoPath, - healthServiceProtoPath, - googleRpcStatusProtoPath, + // Use --root to avoid conflicting with user's root + // and to avoid this error: https://github.com/protobufjs/protobuf.js/issues/1114 + ...['--root', '__temporal'], + ...args, ]; console.log(`Creating protobuf JS definitions`); @@ -101,14 +72,32 @@ async function main() { return; } - await compileProtos( - resolve(outputDir, 'root.d.ts'), - '--path', + const rootDirs = [ resolve(protoBaseDir, 'api_upstream'), - '--path', + resolve(protoBaseDir, 'testsrv_upstream'), resolve(protoBaseDir, 'local'), - '--path', - resolve(protoBaseDir, 'api_cloud_upstream') + resolve(protoBaseDir, 'api_cloud_upstream'), + protoBaseDir, // 'grpc' and 'google' are directly under protoBaseDir + ]; + + const entrypoints = [ + 'temporal/sdk/core/core_interface.proto', + 'temporal/api/workflowservice/v1/service.proto', + 'temporal/api/operatorservice/v1/service.proto', + 'temporal/api/cloud/cloudservice/v1/service.proto', + 'temporal/api/errordetails/v1/message.proto', + 'temporal/api/sdk/v1/workflow_metadata.proto', + 'temporal/api/testservice/v1/request_response.proto', + 'temporal/api/testservice/v1/service.proto', + 'grpc/health/v1/health.proto', + 'google/rpc/status.proto', + ]; + + await compileProtos( + resolve(outputDir, 'root.d.ts'), + // Make sure to include all + ...rootDirs.flatMap((dir) => ['--path', dir]), + ...entrypoints ); console.log('Done'); diff --git a/packages/test/package.json b/packages/test/package.json index 044a46c7e..1ec5fa5da 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -51,8 +51,8 @@ "istanbul-lib-coverage": "^3.2.2", "long": "^5.2.3", "node-fetch": "^2.7.0", - "protobufjs": "^7.2.5", - "protobufjs-cli": "^1.1.2", + "protobufjs": "^7.5.1", + "protobufjs-cli": "^1.1.3", "rxjs": "7.8.1", "stack-utils": "^2.0.6", "uuid": "^9.0.1" diff --git a/packages/test/src/deployment-versioning-no-annotations/index.ts b/packages/test/src/deployment-versioning-no-annotations/index.ts new file mode 100644 index 000000000..d476dfd42 --- /dev/null +++ b/packages/test/src/deployment-versioning-no-annotations/index.ts @@ -0,0 +1,14 @@ +import { setHandler, condition } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v1'); + await condition(() => doFinish); + return 'version-v1'; +} + +export default async function (): Promise { + return 'dynamic'; +} diff --git a/packages/test/src/deployment-versioning-v1/index.ts b/packages/test/src/deployment-versioning-v1/index.ts new file mode 100644 index 000000000..a2a965407 --- /dev/null +++ b/packages/test/src/deployment-versioning-v1/index.ts @@ -0,0 +1,28 @@ +import { setHandler, condition, setWorkflowOptions, workflowInfo } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +setWorkflowOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v1'); + await condition(() => doFinish); + return 'version-v1'; +} + +// Dynamic/default workflow handler +setWorkflowOptions({ versioningBehavior: 'PINNED' }, module.exports.default); +export default async function (): Promise { + return 'dynamic'; +} + +setWorkflowOptions(() => { + // Need to ensure accessing workflow context still works in here + workflowInfo(); + return { + versioningBehavior: 'PINNED', + }; +}, usesGetter); +export async function usesGetter(): Promise { + return 'usesGetter'; +} diff --git a/packages/test/src/deployment-versioning-v2/index.ts b/packages/test/src/deployment-versioning-v2/index.ts new file mode 100644 index 000000000..82e9885c9 --- /dev/null +++ b/packages/test/src/deployment-versioning-v2/index.ts @@ -0,0 +1,11 @@ +import { setHandler, condition, setWorkflowOptions } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +setWorkflowOptions({ versioningBehavior: 'PINNED' }, deploymentVersioning); +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v2'); + await condition(() => doFinish); + return 'version-v2'; +} diff --git a/packages/test/src/deployment-versioning-v3/index.ts b/packages/test/src/deployment-versioning-v3/index.ts new file mode 100644 index 000000000..a1e294b14 --- /dev/null +++ b/packages/test/src/deployment-versioning-v3/index.ts @@ -0,0 +1,11 @@ +import { setHandler, condition, setWorkflowOptions } from '@temporalio/workflow'; +import { unblockSignal, versionQuery } from '../workflows'; + +setWorkflowOptions({ versioningBehavior: 'AUTO_UPGRADE' }, deploymentVersioning); +export async function deploymentVersioning(): Promise { + let doFinish = false; + setHandler(unblockSignal, () => void (doFinish = true)); + setHandler(versionQuery, () => 'v3'); + await condition(() => doFinish); + return 'version-v3'; +} diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 7abcbacd7..5cb73758e 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -41,6 +41,7 @@ const defaultDynamicConfigOptions = [ 'frontend.workerVersioningDataAPIs=true', 'frontend.workerVersioningWorkflowAPIs=true', 'system.enableActivityEagerExecution=true', + 'system.enableDeploymentVersions=true', 'system.enableEagerWorkflowStart=true', 'system.forceSearchAttributesCacheRefreshOnRead=true', 'worker.buildIdScavengerEnabled=true', @@ -134,7 +135,14 @@ export function makeTestFunction(opts: { return makeConfigurableEnvironmentTestFn({ recordedLogs: opts.recordedLogs, createTestContext: async (_t: ExecutionContext): Promise => { - const env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts); + let env: TestWorkflowEnvironment; + if (process.env.TEMPORAL_SERVICE_ADDRESS) { + env = await TestWorkflowEnvironment.createFromExistingServer({ + address: process.env.TEMPORAL_SERVICE_ADDRESS, + }); + } else { + env = await createLocalTestEnvironment(opts.workflowEnvironmentOpts); + } return { workflowBundle: await createTestWorkflowBundle({ workflowsPath: opts.workflowsPath, diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index 0190ecd54..218be7169 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -11,6 +11,7 @@ import { Payload, PayloadCodec } from '@temporalio/common'; import { historyToJSON } from '@temporalio/common/lib/proto-utils'; import * as iface from '@temporalio/proto'; import { + ExistingServerTestWorkflowEnvironmentOptions, LocalTestWorkflowEnvironmentOptions, TestWorkflowEnvironment as RealTestWorkflowEnvironment, TimeSkippingTestWorkflowEnvironmentOptions, @@ -213,6 +214,12 @@ export class TestWorkflowEnvironment extends RealTestWorkflowEnvironment { : undefined), }); } + + static async createFromExistingServer( + opts?: ExistingServerTestWorkflowEnvironmentOptions + ): Promise { + return RealTestWorkflowEnvironment.createFromExistingServer(opts); + } } // Some of our tests expect "default custom search attributes" to exists, which used to be the case diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index 6458f6d7c..04d930084 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -260,6 +260,7 @@ const GenericConfigs = { taskQueue: 'default', identity: 'test-worker', buildId: 'test-build-id', + workerDeploymentOptions: null, useVersioning: false, namespace: 'default', tuner: { diff --git a/packages/test/src/test-integration-split-one.ts b/packages/test/src/test-integration-split-one.ts index 115285c14..225f0df5a 100644 --- a/packages/test/src/test-integration-split-one.ts +++ b/packages/test/src/test-integration-split-one.ts @@ -737,7 +737,8 @@ test.serial('Workflow can read WorkflowInfo', configMacro, async (t, config) => historySize: res.historySize, startTime: res.startTime, runStartTime: res.runStartTime, - currentBuildId: res.currentBuildId, + currentBuildId: res.currentBuildId, // eslint-disable-line deprecation/deprecation + currentDeploymentVersion: res.currentDeploymentVersion, // unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast unsafe: { isReplaying: false } as UnsafeWorkflowInfo, priority: {}, diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 3d0c44321..ea05b71f6 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -496,7 +496,7 @@ export async function buildIdTester(): Promise { }); workflow.setHandler(getBuildIdQuery, () => { - return workflow.workflowInfo().currentBuildId ?? ''; + return workflow.workflowInfo().currentBuildId ?? ''; // eslint-disable-line deprecation/deprecation }); // The unblock signal will only be sent once we are in Worker 1.1. diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index 64be0cbaa..a39bfcb1a 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -94,7 +94,7 @@ if (RUN_INTEGRATION_TESTS) { }); // Capture volatile values that are hard to predict - const { historySize, startTime, runStartTime, currentBuildId } = recordedCalls[0].info; + const { historySize, startTime, runStartTime, currentBuildId, currentDeploymentVersion } = recordedCalls[0].info; // eslint-disable-line deprecation/deprecation t.true(historySize > 300); const info: WorkflowInfo = { @@ -130,6 +130,7 @@ if (RUN_INTEGRATION_TESTS) { startTime, runStartTime, currentBuildId, + currentDeploymentVersion, // unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast unsafe: { isReplaying: false, diff --git a/packages/test/src/test-worker-deployment-versioning.ts b/packages/test/src/test-worker-deployment-versioning.ts new file mode 100644 index 000000000..0a75800d8 --- /dev/null +++ b/packages/test/src/test-worker-deployment-versioning.ts @@ -0,0 +1,409 @@ +/** + * Tests worker-deployment-based versioning + * + * @module + */ +import assert from 'assert'; +import { randomUUID } from 'crypto'; +import asyncRetry from 'async-retry'; +import { ExecutionContext } from 'ava'; +import { Client } from '@temporalio/client'; +import { toCanonicalString, WorkerDeploymentVersion } from '@temporalio/common'; +import { temporal } from '@temporalio/proto'; +import { Worker } from './helpers'; +import { Context, makeTestFunction } from './helpers-integration'; +import { unblockSignal, versionQuery } from './workflows'; + +const test = makeTestFunction({ workflowsPath: __filename }); + +test('Worker deployment based versioning', async (t) => { + const taskQueue = 'worker-deployment-based-versioning-' + randomUUID(); + const deploymentName = 'deployment-' + randomUUID(); + const { client, nativeConnection } = t.context.env; + + const w1DeploymentVersion = { + buildId: '1.0', + deploymentName, + }; + const w2DeploymentVersion = { + buildId: '2.0', + deploymentName, + }; + const w3DeploymentVersion = { + buildId: '3.0', + deploymentName, + }; + + const worker1 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v1'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: w1DeploymentVersion, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + const worker1Promise = worker1.run(); + worker1Promise.catch((err) => { + t.fail('Worker 1.0 run error: ' + JSON.stringify(err)); + }); + + const worker2 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v2'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: w2DeploymentVersion, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + const worker2Promise = worker2.run(); + worker2Promise.catch((err) => { + t.fail('Worker 2.0 run error: ' + JSON.stringify(err)); + }); + + const worker3 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v3'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: w3DeploymentVersion, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + const worker3Promise = worker3.run(); + worker3Promise.catch((err) => { + t.fail('Worker 3.0 run error: ' + JSON.stringify(err)); + }); + + // Wait for worker 1 to be visible and set as current version + const describeResp1 = await waitUntilWorkerDeploymentVisible(client, w1DeploymentVersion); + await setCurrentDeploymentVersion(client, describeResp1.conflictToken, w1DeploymentVersion); + + // Start workflow 1 which will use the 1.0 worker on auto-upgrade + const wf1 = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: 'deployment-versioning-v1-' + randomUUID(), + }); + const state1 = await wf1.query(versionQuery); + assert.equal(state1, 'v1'); + + // Wait for worker 2 to be visible and set as current version + const describeResp2 = await waitUntilWorkerDeploymentVisible(client, w2DeploymentVersion); + await setCurrentDeploymentVersion(client, describeResp2.conflictToken, w2DeploymentVersion); + + // Start workflow 2 which will use the 2.0 worker pinned + const wf2 = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: 'deployment-versioning-v2-' + randomUUID(), + }); + const state2 = await wf2.query(versionQuery); + assert.equal(state2, 'v2'); + + // Wait for worker 3 to be visible and set as current version + const describeResp3 = await waitUntilWorkerDeploymentVisible(client, w3DeploymentVersion); + await setCurrentDeploymentVersion(client, describeResp3.conflictToken, w3DeploymentVersion); + + // Start workflow 3 which will use the 3.0 worker on auto-upgrade + const wf3 = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: 'deployment-versioning-v3-' + randomUUID(), + }); + const state3 = await wf3.query(versionQuery); + assert.equal(state3, 'v3'); + + // Signal all workflows to finish + await wf1.signal(unblockSignal); + await wf2.signal(unblockSignal); + await wf3.signal(unblockSignal); + + const res1 = await wf1.result(); + const res2 = await wf2.result(); + const res3 = await wf3.result(); + + assert.equal(res1, 'version-v3'); + assert.equal(res2, 'version-v2'); + assert.equal(res3, 'version-v3'); + + worker1.shutdown(); + worker2.shutdown(); + worker3.shutdown(); + await worker1Promise; + await worker2Promise; + await worker3Promise; + t.pass(); +}); + +test('Worker deployment based versioning with ramping', async (t) => { + const taskQueue = 'worker-deployment-based-ramping-' + randomUUID(); + const deploymentName = 'deployment-ramping-' + randomUUID(); + const { client, nativeConnection } = t.context.env; + + const v1 = { + buildId: '1.0', + deploymentName, + }; + const v2 = { + buildId: '2.0', + deploymentName, + }; + + const worker1 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v1'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: v1, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + const worker1Promise = worker1.run(); + worker1Promise.catch((err) => { + t.fail('Worker 1.0 run error: ' + JSON.stringify(err)); + }); + + const worker2 = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v2'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: v2, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + const worker2Promise = worker2.run(); + worker2Promise.catch((err) => { + t.fail('Worker 2.0 run error: ' + JSON.stringify(err)); + }); + + // Wait for worker deployments to be visible + await waitUntilWorkerDeploymentVisible(client, v1); + const describeResp = await waitUntilWorkerDeploymentVisible(client, v2); + + // Set current version to v1 and ramp v2 to 100% + let conflictToken = (await setCurrentDeploymentVersion(client, describeResp.conflictToken, v1)).conflictToken; + conflictToken = (await setRampingVersion(client, conflictToken, v2, 100)).conflictToken; + + // Run workflows and verify they run on v2 + for (let i = 0; i < 3; i++) { + const wf = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: `versioning-ramp-100-${i}-${randomUUID()}`, + }); + await wf.signal(unblockSignal); + const res = await wf.result(); + assert.equal(res, 'version-v2'); + } + + // Set ramp to 0, expecting workflows to run on v1 + conflictToken = (await setRampingVersion(client, conflictToken, v2, 0)).conflictToken; + for (let i = 0; i < 3; i++) { + const wf = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: `versioning-ramp-0-${i}-${randomUUID()}`, + }); + await wf.signal(unblockSignal); + const res = await wf.result(); + assert.equal(res, 'version-v1'); + } + + // Set ramp to 50 and eventually verify workflows run on both versions + await setRampingVersion(client, conflictToken, v2, 50); + const seenResults = new Set(); + + const runAndRecord = async () => { + const wf = await client.workflow.start('deploymentVersioning', { + taskQueue, + workflowId: `versioning-ramp-50-${randomUUID()}`, + }); + await wf.signal(unblockSignal); + return await wf.result(); + }; + + await asyncRetry( + async () => { + const res = await runAndRecord(); + seenResults.add(res); + if (!seenResults.has('version-v1') || !seenResults.has('version-v2')) { + throw new Error('Not all versions seen yet'); + } + }, + { maxTimeout: 1000, retries: 20 } + ); + + worker1.shutdown(); + worker2.shutdown(); + await worker1Promise; + await worker2Promise; + t.pass(); +}); + +async function testWorkerDeploymentWithDynamicBehavior( + t: ExecutionContext, + workflowName: string, + expectedResult: string +) { + if (t.context.env.supportsTimeSkipping) { + t.pass("Test Server doesn't support worker deployments"); + return; + } + + const taskQueue = 'worker-deployment-dynamic-' + randomUUID(); + const deploymentName = 'deployment-dynamic-' + randomUUID(); + const { client, nativeConnection } = t.context.env; + + const version = { + buildId: '1.0', + deploymentName, + }; + + const worker = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-v1'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version, + defaultVersioningBehavior: 'AUTO_UPGRADE', + }, + connection: nativeConnection, + }); + + const workerPromise = worker.run(); + workerPromise.catch((err) => { + t.fail('Worker run error: ' + JSON.stringify(err)); + }); + + const describeResp = await waitUntilWorkerDeploymentVisible(client, version); + await setCurrentDeploymentVersion(client, describeResp.conflictToken, version); + + const wf = await client.workflow.start(workflowName, { + taskQueue, + workflowId: 'dynamic-workflow-versioning-' + randomUUID(), + }); + + const result = await wf.result(); + assert.equal(result, expectedResult); + + const history = await wf.fetchHistory(); + const hasPinnedVersioningBehavior = history.events!.some( + (event) => + event.workflowTaskCompletedEventAttributes && + event.workflowTaskCompletedEventAttributes.versioningBehavior === + temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED + ); + assert.ok(hasPinnedVersioningBehavior, 'Expected workflow to use pinned versioning behavior'); + + worker.shutdown(); + await workerPromise; + t.pass(); +} + +test('Worker deployment with dynamic workflow static behavior', async (t) => { + await testWorkerDeploymentWithDynamicBehavior(t, 'cooldynamicworkflow', 'dynamic'); +}); + +test('Worker deployment with behavior in getter', async (t) => { + await testWorkerDeploymentWithDynamicBehavior(t, 'usesGetter', 'usesGetter'); +}); + +test('Workflows can use default versioning behavior', async (t) => { + const taskQueue = 'task-queue-default-versioning-' + randomUUID(); + const deploymentName = 'deployment-default-versioning-' + randomUUID(); + const { client, nativeConnection } = t.context.env; + + const workerV1 = { + buildId: '1.0', + deploymentName, + }; + + const worker = await Worker.create({ + workflowsPath: require.resolve('./deployment-versioning-no-annotations'), + taskQueue, + workerDeploymentOptions: { + useWorkerVersioning: true, + version: workerV1, + defaultVersioningBehavior: 'PINNED', + }, + connection: nativeConnection, + }); + + const workerPromise = worker.run(); + workerPromise.catch((err) => { + t.fail('Worker run error: ' + JSON.stringify(err)); + }); + + const describeResp = await waitUntilWorkerDeploymentVisible(client, workerV1); + await setCurrentDeploymentVersion(client, describeResp.conflictToken, workerV1); + + const wf = await client.workflow.start('noVersioningAnnotationWorkflow', { + taskQueue, + workflowId: 'default-versioning-behavior-' + randomUUID(), + }); + + await wf.result(); + + const history = await wf.fetchHistory(); + const hasPinnedVersioningBehavior = history.events!.some( + (event) => + event.workflowTaskCompletedEventAttributes && + event.workflowTaskCompletedEventAttributes.versioningBehavior === + temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED + ); + assert.ok(hasPinnedVersioningBehavior, 'Expected workflow to use pinned versioning behavior'); + + worker.shutdown(); + await workerPromise; + t.pass(); +}); + +async function setRampingVersion( + client: Client, + conflictToken: Uint8Array, + version: WorkerDeploymentVersion, + percentage: number +) { + return await client.workflowService.setWorkerDeploymentRampingVersion({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + version: toCanonicalString(version), + conflictToken, + percentage, + }); +} + +async function waitUntilWorkerDeploymentVisible(client: Client, version: WorkerDeploymentVersion) { + return await asyncRetry( + async () => { + const resp = await client.workflowService.describeWorkerDeployment({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + }); + const isVersionVisible = resp.workerDeploymentInfo!.versionSummaries!.some( + (vs) => vs.version === toCanonicalString(version) + ); + if (!isVersionVisible) { + throw new Error('Version not visible yet'); + } + return resp; + }, + { maxTimeout: 1000, retries: 10 } + ); +} + +async function setCurrentDeploymentVersion( + client: Client, + conflictToken: Uint8Array, + version: WorkerDeploymentVersion +) { + return await client.workflowService.setWorkerDeploymentCurrentVersion({ + namespace: client.options.namespace, + deploymentName: version.deploymentName, + version: toCanonicalString(version), + conflictToken, + }); +} diff --git a/packages/test/src/workflows/definitions.ts b/packages/test/src/workflows/definitions.ts index e5aa77e01..b1056fda1 100644 --- a/packages/test/src/workflows/definitions.ts +++ b/packages/test/src/workflows/definitions.ts @@ -1,8 +1,9 @@ /* eslint-disable no-duplicate-imports */ -import { defineSignal } from '@temporalio/workflow'; +import { defineQuery, defineSignal } from '@temporalio/workflow'; export const activityStartedSignal = defineSignal('activityStarted'); export const failSignal = defineSignal('fail'); export const failWithMessageSignal = defineSignal<[string]>('fail'); export const argsTestSignal = defineSignal<[number, string]>('argsTest'); export const unblockSignal = defineSignal('unblock'); +export const versionQuery = defineQuery('version'); diff --git a/packages/testing/src/index.ts b/packages/testing/src/index.ts index d00594ce4..8e8077e5c 100644 --- a/packages/testing/src/index.ts +++ b/packages/testing/src/index.ts @@ -14,6 +14,7 @@ export { TestWorkflowEnvironment, type LocalTestWorkflowEnvironmentOptions, type TimeSkippingTestWorkflowEnvironmentOptions, + type ExistingServerTestWorkflowEnvironmentOptions, } from './testing-workflow-environment'; export { diff --git a/packages/testing/src/testing-workflow-environment.ts b/packages/testing/src/testing-workflow-environment.ts index 9cfe4f91a..be78ea7ec 100644 --- a/packages/testing/src/testing-workflow-environment.ts +++ b/packages/testing/src/testing-workflow-environment.ts @@ -1,5 +1,5 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import -import { AsyncCompletionClient, Client, WorkflowClient } from '@temporalio/client'; +import { AsyncCompletionClient, Client, WorkflowClient, ClientOptions } from '@temporalio/client'; import { Duration, TypedSearchAttributes } from '@temporalio/common'; import { msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time'; import { NativeConnection, Runtime } from '@temporalio/worker'; @@ -25,6 +25,15 @@ export type TimeSkippingTestWorkflowEnvironmentOptions = { client?: ClientOptionsForTestEnv; }; +/** + * Options for {@link TestWorkflowEnvironment.createExistingServer} + */ +export type ExistingServerTestWorkflowEnvironmentOptions = { + /** If not set, defaults to localhost:7233 */ + address?: string; + client?: ClientOptions; +}; + /** * An execution environment for running Workflow integration tests. * @@ -72,7 +81,7 @@ export class TestWorkflowEnvironment { private readonly runtime: Runtime, public readonly options: TestWorkflowEnvironmentOptionsWithDefaults, public readonly supportsTimeSkipping: boolean, - protected readonly server: native.EphemeralServer, + protected readonly server: native.EphemeralServer | 'existing', connection: Connection, nativeConnection: NativeConnection, namespace: string | undefined @@ -107,9 +116,9 @@ export class TestWorkflowEnvironment { * environment, not to the workflow under test. We highly recommend running tests serially when using a single * environment or creating a separate environment per test. * - * By default, the latest release of the Test Serveer will be downloaded and cached to a temporary directory + * By default, the latest release of the Test Server will be downloaded and cached to a temporary directory * (e.g. `$TMPDIR/temporal-test-server-sdk-typescript-*` or `%TEMP%/temporal-test-server-sdk-typescript-*.exe`). Note - * that existing cached binairies will be reused without validation that they are still up-to-date, until the SDK + * that existing cached binaries will be reused without validation that they are still up-to-date, until the SDK * itself is updated. Alternatively, a specific version number of the Test Server may be provided, or the path to an * existing Test Server binary may be supplied; see {@link LocalTestWorkflowEnvironmentOptions.server.executable}. * @@ -141,7 +150,7 @@ export class TestWorkflowEnvironment { * * By default, the latest release of the CLI will be downloaded and cached to a temporary directory * (e.g. `$TMPDIR/temporal-sdk-typescript-*` or `%TEMP%/temporal-sdk-typescript-*.exe`). Note that existing cached - * binairies will be reused without validation that they are still up-to-date, until the SDK itself is updated. + * binaries will be reused without validation that they are still up-to-date, until the SDK itself is updated. * Alternatively, a specific version number of the CLI may be provided, or the path to an existing CLI binary may be * supplied; see {@link LocalTestWorkflowEnvironmentOptions.server.executable}. * @@ -158,6 +167,22 @@ export class TestWorkflowEnvironment { }); } + /** + * Create a new test environment using an existing server. You must already be running a server, which the test + * environment will connect to. + */ + static async createFromExistingServer( + opts?: ExistingServerTestWorkflowEnvironmentOptions + ): Promise { + return await this.create({ + server: { type: 'existing' }, + client: opts?.client, + namespace: opts?.client?.namespace ?? 'default', + supportsTimeSkipping: false, + address: opts?.address, + }); + } + /** * Create a new test environment */ @@ -165,25 +190,33 @@ export class TestWorkflowEnvironment { opts: TestWorkflowEnvironmentOptions & { supportsTimeSkipping: boolean; namespace?: string; + address?: string; } ): Promise { const { supportsTimeSkipping, namespace, ...rest } = opts; const optsWithDefaults = addDefaults(filterNullAndUndefined(rest)); - // Add search attributes to CLI server arguments - if ('searchAttributes' in optsWithDefaults.server && optsWithDefaults.server.searchAttributes) { - let newArgs: string[] = []; - for (const { name, type } of optsWithDefaults.server.searchAttributes) { - newArgs.push('--search-attribute'); - newArgs.push(`${name}=${TypedSearchAttributes.toMetadataType(type)}`); + let address: string; + const runtime = Runtime.instance(); + let server: native.EphemeralServer | 'existing'; + if (optsWithDefaults.server.type !== 'existing') { + // Add search attributes to CLI server arguments + if ('searchAttributes' in optsWithDefaults.server && optsWithDefaults.server.searchAttributes) { + let newArgs: string[] = []; + for (const { name, type } of optsWithDefaults.server.searchAttributes) { + newArgs.push('--search-attribute'); + newArgs.push(`${name}=${TypedSearchAttributes.toMetadataType(type)}`); + } + newArgs = newArgs.concat(optsWithDefaults.server.extraArgs ?? []); + optsWithDefaults.server.extraArgs = newArgs; } - newArgs = newArgs.concat(optsWithDefaults.server.extraArgs ?? []); - optsWithDefaults.server.extraArgs = newArgs; - } - const runtime = Runtime.instance(); - const server = await runtime.createEphemeralServer(toNativeEphemeralServerConfig(optsWithDefaults.server)); - const address = native.ephemeralServerGetTarget(server); + server = await runtime.createEphemeralServer(toNativeEphemeralServerConfig(optsWithDefaults.server)); + address = native.ephemeralServerGetTarget(server); + } else { + address = opts.address ?? 'localhost:7233'; + server = 'existing'; + } const nativeConnection = await NativeConnection.connect({ address }); const connection = await Connection.connect({ address }); @@ -203,16 +236,18 @@ export class TestWorkflowEnvironment { console.error(e); /* ignore */ }); - await this.runtime.shutdownEphemeralServer(this.server).catch((e) => { - console.error(e); - /* ignore */ - }); + if (this.server !== 'existing') { + await this.runtime.shutdownEphemeralServer(this.server).catch((e) => { + console.error(e); + /* ignore */ + }); + } } /** * Wait for `durationMs` in "server time". * - * This awaits using regular setTimeout in regular environments, or manually skips time in time-skipping environments. + * This awaits using regular setTimeout in regular environments or manually skips time in time-skipping environments. * * Useful for simulating events far into the future like completion of long running activities. * @@ -281,10 +316,12 @@ export class TestWorkflowEnvironment { * Options for {@link TestWorkflowEnvironment.create} */ type TestWorkflowEnvironmentOptions = { - server: DevServerConfig | TimeSkippingServerConfig; + server: DevServerConfig | TimeSkippingServerConfig | ExistingServerConfig; client?: ClientOptionsForTestEnv; }; +type ExistingServerConfig = { type: 'existing' }; + type TestWorkflowEnvironmentOptionsWithDefaults = Required; function addDefaults(opts: TestWorkflowEnvironmentOptions): TestWorkflowEnvironmentOptionsWithDefaults { diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index c351bdf43..5366c6f7b 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -1,3 +1,4 @@ +import { WorkerDeploymentVersion } from '@temporalio/common'; import type { coresdk, temporal } from '@temporalio/proto'; import { IllegalStateError, ParentWorkflowInfo, RootWorkflowInfo } from '@temporalio/workflow'; @@ -29,6 +30,19 @@ export function convertToParentWorkflowType( }; } +export function convertDeploymentVersion( + v: coresdk.common.IWorkerDeploymentVersion | null | undefined +): WorkerDeploymentVersion | undefined { + if (v == null || v.buildId == null) { + return undefined; + } + + return { + buildId: v.buildId, + deploymentName: v.deploymentName ?? '', + }; +} + export function convertToRootWorkflowType( root: temporal.api.common.v1.IWorkflowExecution | null | undefined ): RootWorkflowInfo | undefined { diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index 44c8e1941..354afd188 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1,7 +1,13 @@ import * as os from 'node:os'; import * as v8 from 'node:v8'; import type { Configuration as WebpackConfiguration } from 'webpack'; -import { ActivityFunction, DataConverter, LoadedDataConverter } from '@temporalio/common'; +import { + ActivityFunction, + DataConverter, + LoadedDataConverter, + VersioningBehavior, + WorkerDeploymentVersion, +} from '@temporalio/common'; import { Duration, msOptionalToNumber, msToNumber } from '@temporalio/common/lib/time'; import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow'; import { LoggerSinks } from '@temporalio/workflow'; @@ -61,6 +67,7 @@ export interface WorkerOptions { * @default `@temporalio/worker` package name and version + checksum of workflow bundle's code * * @experimental The Worker Versioning API is still being designed. Major changes are expected. + * @deprecated Use {@link workerDeploymentOptions} instead. */ buildId?: string; @@ -72,9 +79,17 @@ export interface WorkerOptions { * For more information, see https://docs.temporal.io/workers#worker-versioning * * @experimental The Worker Versioning API is still being designed. Major changes are expected. + * @deprecated Use {@link workerDeploymentOptions} instead. */ useVersioning?: boolean; + /** + * Deployment options for the worker. Exclusive with `build_id` and `use_worker_versioning`. + * + * @experimental Deployment based versioning is still experimental. + */ + workerDeploymentOptions?: WorkerDeploymentOptions; + /** * The namespace this worker will connect to * @@ -555,6 +570,30 @@ export interface PollerBehaviorSimpleMaximum { maximum?: number; } +/** + * Allows specifying the deployment version of the worker and whether to use deployment-based + * worker versioning. + * + * @experimental Deployment based versioning is still experimental. + */ +export type WorkerDeploymentOptions = { + /** + * The deployment version of the worker. + */ + version: WorkerDeploymentVersion; + + /** + * Whether to use deployment-based worker versioning. + */ + useWorkerVersioning: boolean; + + /** + * The default versioning behavior to use for all workflows on this worker. Specifying a default + * behavior is required. + */ + defaultVersioningBehavior: VersioningBehavior; +}; + // Replay Worker /////////////////////////////////////////////////////////////////////////////////// /** @@ -763,8 +802,8 @@ export type CompiledWorkerOptionsWithBuildId = CompiledWorkerOptions & { function addDefaultWorkerOptions(options: WorkerOptions, logger: Logger): WorkerOptionsWithDefaults { const { - buildId, - useVersioning, + buildId, // eslint-disable-line deprecation/deprecation + useVersioning, // eslint-disable-line deprecation/deprecation maxCachedWorkflows, showStackTraceSources, namespace, @@ -924,8 +963,9 @@ export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): Co export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { return { identity: opts.identity, - buildId: opts.buildId, - useVersioning: opts.useVersioning, + buildId: opts.buildId, // eslint-disable-line deprecation/deprecation + useVersioning: opts.useVersioning, // eslint-disable-line deprecation/deprecation + workerDeploymentOptions: toNativeDeploymentOptions(opts.workerDeploymentOptions), taskQueue: opts.taskQueue, namespace: opts.namespace, tuner: opts.tuner, @@ -962,6 +1002,29 @@ export function toNativeTaskPollerBehavior(behavior: Required): } } +function toNativeDeploymentOptions(options?: WorkerDeploymentOptions): native.WorkerDeploymentOptions | null { + if (options === undefined) { + return null; + } + let vb: native.VersioningBehavior; + switch (options.defaultVersioningBehavior) { + case 'PINNED': + vb = { type: 'pinned' }; + break; + case 'AUTO_UPGRADE': + vb = { type: 'auto-upgrade' }; + break; + default: + options.defaultVersioningBehavior satisfies never; + throw new Error(`Unknown versioning behavior: ${options.defaultVersioningBehavior}`); + } + return { + version: options.version, + useWorkerVersioning: options.useWorkerVersioning, + defaultVersioningBehavior: vb, + }; +} + // Utils /////////////////////////////////////////////////////////////////////////////////////////// function isSet(env: string | undefined): boolean { diff --git a/packages/worker/src/worker-tuner.ts b/packages/worker/src/worker-tuner.ts index 303ce3f60..72e95e934 100644 --- a/packages/worker/src/worker-tuner.ts +++ b/packages/worker/src/worker-tuner.ts @@ -1,6 +1,6 @@ import { native } from '@temporalio/core-bridge'; import { Duration, msToNumber } from '@temporalio/common/lib/time'; -import { Logger } from '@temporalio/common'; +import { Logger, WorkerDeploymentVersion } from '@temporalio/common'; /** * A worker tuner allows the customization of the performance characteristics of workers by @@ -210,6 +210,11 @@ export interface LocalActivitySlotInfo { // eslint-disable-next-line @typescript-eslint/no-empty-object-type export interface SlotPermit {} +/** + * Context for reserving a slot. + * + * @experimental Worker Tuner is an experimental feature and may be subject to change. + */ export interface SlotReserveContext { /** * The type of slot trying to be reserved @@ -231,6 +236,11 @@ export interface SlotReserveContext { */ workerBuildId: string; + /** + * The deployment version of the worker that is requesting the reservation + */ + workerDeploymentVersion?: WorkerDeploymentVersion; + /** * True iff this is a reservation for a sticky poll for a workflow task */ @@ -391,7 +401,7 @@ function addResourceBasedSlotDefaults( } } -class NativeifiedCustomSlotSupplier implements CustomSlotSupplier { +class NativeifiedCustomSlotSupplier implements native.CustomSlotSupplierOptions { readonly type = 'custom'; constructor( @@ -404,14 +414,18 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu this.releaseSlot = this.releaseSlot.bind(this); } - async reserveSlot(ctx: SlotReserveContext, abortSignal: AbortSignal): Promise { + async reserveSlot(ctx: native.SlotReserveContext, abortSignal: AbortSignal): Promise { + if (ctx.slotType === 'nexus') { + throw new Error('nexus not yet supported in slot suppliers'); + } try { const result = await this.supplier.reserveSlot( { slotType: ctx.slotType, taskQueue: ctx.taskQueue, workerIdentity: ctx.workerIdentity, - workerBuildId: ctx.workerBuildId, + workerBuildId: ctx.workerDeploymentVersion?.buildId ?? '', + workerDeploymentVersion: ctx.workerDeploymentVersion ?? undefined, isSticky: ctx.isSticky, }, abortSignal @@ -425,13 +439,17 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu } } - tryReserveSlot(ctx: SlotReserveContext): SlotPermit | null { + tryReserveSlot(ctx: native.SlotReserveContext): SlotPermit | null { + if (ctx.slotType === 'nexus') { + throw new Error('nexus not yet supported in slot suppliers'); + } try { const result = this.supplier.tryReserveSlot({ slotType: ctx.slotType, taskQueue: ctx.taskQueue, workerIdentity: ctx.workerIdentity, - workerBuildId: ctx.workerBuildId, + workerBuildId: ctx.workerDeploymentVersion?.buildId ?? '', + workerDeploymentVersion: ctx.workerDeploymentVersion ?? undefined, isSticky: ctx.isSticky, }); return result ?? null; @@ -441,7 +459,7 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu } } - markSlotUsed(ctx: SlotMarkUsedContext): void { + markSlotUsed(ctx: native.SlotMarkUsedContext): void { try { this.supplier.markSlotUsed({ slotInfo: ctx.slotInfo, @@ -452,7 +470,7 @@ class NativeifiedCustomSlotSupplier implements CustomSlotSu } } - releaseSlot(ctx: SlotReleaseContext): void { + releaseSlot(ctx: native.SlotReleaseContext): void { try { this.supplier.releaseSlot({ slotInfo: ctx.slotInfo ?? undefined, diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 6a86c3d18..d93a058cf 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -69,7 +69,12 @@ import { } from './replay'; import { History, Runtime } from './runtime'; import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; -import { byteArrayToBuffer, convertToParentWorkflowType, convertToRootWorkflowType } from './utils'; +import { + byteArrayToBuffer, + convertDeploymentVersion, + convertToParentWorkflowType, + convertToRootWorkflowType, +} from './utils'; import { CompiledWorkerOptions, CompiledWorkerOptionsWithBuildId, @@ -154,7 +159,8 @@ interface WorkflowWithLogAttributes { } function addBuildIdIfMissing(options: CompiledWorkerOptions, bundleCode?: string): CompiledWorkerOptionsWithBuildId { - if (options.buildId != null) { + const bid = options.buildId; // eslint-disable-line deprecation/deprecation + if (bid != null) { return options as CompiledWorkerOptionsWithBuildId; } const suffix = bundleCode ? `+${crypto.createHash('sha256').update(bundleCode).digest('hex')}` : ''; @@ -1262,7 +1268,7 @@ export class Worker { priority, } = initWorkflowJob; - // Note that we can't do payload convertion here, as there's no guarantee that converted payloads would be safe to + // Note that we can't do payload conversion here, as there's no guarantee that converted payloads would be safe to // transfer through the V8 message port. Those will therefore be set in the Activator's initializeWorkflow job handler. const workflowInfo: WorkflowInfo = { workflowId, @@ -1292,7 +1298,8 @@ export class Worker { // A zero value means that it was not set by the server historySize: activation.historySizeBytes.toNumber(), continueAsNewSuggested: activation.continueAsNewSuggested, - currentBuildId: activation.buildIdForCurrentTask, + currentBuildId: activation.deploymentVersionForCurrentTask?.buildId ?? '', + currentDeploymentVersion: convertDeploymentVersion(activation.deploymentVersionForCurrentTask), unsafe: { now: () => Date.now(), // re-set in initRuntime isReplaying: activation.isReplaying, diff --git a/packages/worker/src/workflow/vm-shared.ts b/packages/worker/src/workflow/vm-shared.ts index 7418b2655..96b76fd44 100644 --- a/packages/worker/src/workflow/vm-shared.ts +++ b/packages/worker/src/workflow/vm-shared.ts @@ -14,6 +14,7 @@ import * as internals from '@temporalio/workflow/lib/worker-interface'; import { Activator } from '@temporalio/workflow/lib/internals'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { UnhandledRejectionError } from '../errors'; +import { convertDeploymentVersion } from '../utils'; import { Workflow } from './interface'; import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input'; @@ -348,7 +349,8 @@ export abstract class BaseVMWorkflow implements Workflow { // historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown historySize: activation.historySizeBytes?.toNumber() ?? 0, continueAsNewSuggested: activation.continueAsNewSuggested ?? false, - currentBuildId: activation.buildIdForCurrentTask ?? undefined, + currentBuildId: activation.deploymentVersionForCurrentTask?.buildId ?? '', + currentDeploymentVersion: convertDeploymentVersion(activation.deploymentVersionForCurrentTask), unsafe: { ...info.unsafe, isReplaying: activation.isReplaying ?? false, diff --git a/packages/workflow/src/flags.ts b/packages/workflow/src/flags.ts index 139ee8a5e..1ed209b13 100644 --- a/packages/workflow/src/flags.ts +++ b/packages/workflow/src/flags.ts @@ -72,5 +72,5 @@ type AltConditionFn = (ctx: { info: WorkflowInfo }) => boolean; function buildIdSdkVersionMatches(version: RegExp): AltConditionFn { const regex = new RegExp(`^@temporalio/worker@(${version.source})[+]`); - return ({ info }) => info.currentBuildId != null && regex.test(info.currentBuildId); + return ({ info }) => info.currentBuildId != null && regex.test(info.currentBuildId); // eslint-disable-line deprecation/deprecation } diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 4847a2c37..54f0aa5de 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -12,6 +12,8 @@ import { TypedSearchAttributes, SearchAttributePair, Priority, + WorkerDeploymentVersion, + VersioningBehavior, } from '@temporalio/common'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow/enums-helpers'; @@ -195,9 +197,22 @@ export interface WorkflowInfo { * task was completed by a worker without a Build ID. If this worker is the one executing this * task for the first time and has a Build ID set, then its ID will be used. This value may change * over the lifetime of the workflow run, but is deterministic and safe to use for branching. + * + * @deprecated Use `currentDeploymentVersion` instead */ readonly currentBuildId?: string; + /** + * The Deployment Version of the worker which executed the current Workflow Task. May be undefined + * if the task was completed by a worker without a Deployment Version. If this worker is the one + * executing this task for the first time and has a Deployment Version set, then its ID will be + * used. This value may change over the lifetime of the workflow run, but is deterministic and + * safe to use for branching. + * + * @experimental Deployment based versioning is experimental and may change in the future. + */ + readonly currentDeploymentVersion?: WorkerDeploymentVersion; + readonly unsafe: UnsafeWorkflowInfo; /** @@ -623,4 +638,5 @@ export type UpdateHandlerOptions = { export interface ActivationCompletion { commands: coresdk.workflow_commands.IWorkflowCommand[]; usedInternalFlags: number[]; + versioningBehavior?: VersioningBehavior; } diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 75f7f2d91..b8e86b2a2 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -20,6 +20,9 @@ import { WorkflowUpdateValidatorType, mapFromPayloads, fromPayloadsAtIndex, + WorkflowFunctionWithOptions, + VersioningBehavior, + WorkflowDefinitionOptions, } from '@temporalio/common'; import { decodeSearchAttributes, @@ -382,7 +385,7 @@ export class Activator implements ActivationHandler { /** * Reference to the current Workflow, initialized when a Workflow is started */ - public workflow?: Workflow; + public workflow?: Workflow | WorkflowFunctionWithOptions; /** * Information about the current Workflow @@ -424,6 +427,9 @@ export class Activator implements ActivationHandler { public readonly registeredActivityNames: Set; + public versioningBehavior?: VersioningBehavior; + public workflowDefinitionOptionsGetter?: () => WorkflowDefinitionOptions; + constructor({ info, now, @@ -496,6 +502,7 @@ export class Activator implements ActivationHandler { return { commands: this.commands.splice(0), usedInternalFlags: [...this.knownFlags], + versioningBehavior: this.versioningBehavior, }; } @@ -537,6 +544,9 @@ export class Activator implements ActivationHandler { ? this.failureConverter.failureToError(continuedFailure, this.payloadConverter) : undefined, })); + if (this.workflowDefinitionOptionsGetter) { + this.versioningBehavior = this.workflowDefinitionOptionsGetter().versioningBehavior; + } } public cancelWorkflow(_activation: coresdk.workflow_activation.ICancelWorkflow): void { diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index 0d902c565..c80694c1f 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -3,7 +3,7 @@ * * @module */ -import { IllegalStateError } from '@temporalio/common'; +import { encodeVersioningBehavior, IllegalStateError, isWorkflowFunctionWithOptions } from '@temporalio/common'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { coresdk } from '@temporalio/proto'; import { disableStorage } from './cancellation-scope'; @@ -80,9 +80,27 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { const defaultWorkflowFn = mod['default']; if (typeof workflowFn === 'function') { - activator.workflow = workflowFn; + if (isWorkflowFunctionWithOptions(workflowFn)) { + activator.workflow = workflowFn; + if (typeof workflowFn.workflowDefinitionOptions === 'object') { + activator.versioningBehavior = workflowFn.workflowDefinitionOptions.versioningBehavior; + } else { + activator.workflowDefinitionOptionsGetter = workflowFn.workflowDefinitionOptions; + } + } else { + activator.workflow = workflowFn; + } } else if (typeof defaultWorkflowFn === 'function') { - activator.workflow = defaultWorkflowFn; + if (isWorkflowFunctionWithOptions(defaultWorkflowFn)) { + activator.workflow = defaultWorkflowFn; + if (typeof defaultWorkflowFn.workflowDefinitionOptions === 'object') { + activator.versioningBehavior = defaultWorkflowFn.workflowDefinitionOptions.versioningBehavior; + } else { + activator.workflowDefinitionOptionsGetter = defaultWorkflowFn.workflowDefinitionOptions; + } + } else { + activator.workflow = defaultWorkflowFn; + } } else { const details = workflowFn === undefined @@ -203,7 +221,11 @@ export function concludeActivation(): coresdk.workflow_completion.IWorkflowActiv } return { runId: activator.info.runId, - successful: { ...activationCompletion, commands }, + successful: { + ...activationCompletion, + commands, + versioningBehavior: encodeVersioningBehavior(activationCompletion.versioningBehavior), + }, }; } finally { activator.rethrowSynchronously = false; diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 54b3069a0..d915d6ed7 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -23,6 +23,7 @@ import { WorkflowUpdateValidatorType, SearchAttributeUpdatePair, compilePriority, + WorkflowDefinitionOptionsOrGetter, } from '@temporalio/common'; import { encodeUnifiedSearchAttributes, @@ -1614,6 +1615,43 @@ export function allHandlersFinished(): boolean { return activator.inProgressSignals.size === 0 && activator.inProgressUpdates.size === 0; } +/** + * Can be used to alter workflow functions with certain options specified at definition time. + * + * @example + * For example: + * ```ts + * setWorkflowOptions({ versioningBehavior: 'PINNED' }, myWorkflow); + * export async function myWorkflow(): Promise { + * // Workflow code here + * return "hi"; + * } + * ``` + * + * @example + * To annotate a default or dynamic workflow: + * ```ts + * export default async function (): Promise { + * // Workflow code here + * return "hi"; + * } + * setWorkflowOptions({ versioningBehavior: 'PINNED' }, module.exports.default); + * ``` + * + * @param options Options for the workflow defintion, or a function that returns options. If a + * function is provided, it will be called once just before the workflow function is called for the + * first time. It is safe to call {@link workflowInfo} inside such a function. + * @param fn The workflow function. + */ +export function setWorkflowOptions( + options: WorkflowDefinitionOptionsOrGetter, + fn: (...args: A) => Promise +): void { + Object.assign(fn, { + workflowDefinitionOptions: options, + }); +} + export const stackTraceQuery = defineQuery('__stack_trace'); export const enhancedStackTraceQuery = defineQuery('__enhanced_stack_trace'); export const workflowMetadataQuery = defineQuery('__temporal_workflow_metadata');