Skip to content

Worker Deployment Versioning #1679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
40 changes: 36 additions & 4 deletions packages/client/src/workflow-options.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { CommonWorkflowOptions, SignalDefinition, WithWorkflowArgs, Workflow } from '@temporalio/common';
import {
CommonWorkflowOptions,
SignalDefinition,
WithWorkflowArgs,
Workflow,
VersioningOverride,
toCanonicalString,
} from '@temporalio/common';
import { Duration, msOptionalToTs } from '@temporalio/common/lib/time';
import { Replace } from '@temporalio/common/lib/type-helpers';
import { google } from '@temporalio/proto';
import { google, temporal } from '@temporalio/proto';

export * from '@temporalio/common/lib/workflow-options';

Expand Down Expand Up @@ -38,9 +45,15 @@ export interface WorkflowOptions extends CommonWorkflowOptions {

/**
* Amount of time to wait before starting the workflow.
*
*/
startDelay?: Duration;

/**
* Override the versioning behavior of the Workflow that is about to be started.
*
* @experimental Deployment based versioning is experimental and may change in the future.
*/
versioningOverride?: VersioningOverride;
}

export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
Expand All @@ -50,18 +63,21 @@ export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
workflowRunTimeout?: google.protobuf.IDuration;
workflowTaskTimeout?: google.protobuf.IDuration;
startDelay?: google.protobuf.IDuration;
versioningOverride?: temporal.api.workflow.v1.IVersioningOverride;
}
>;

export function compileWorkflowOptions<T extends WorkflowOptions>(options: T): WithCompiledWorkflowOptions<T> {
const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, ...rest } = options;
const { workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, startDelay, versioningOverride, ...rest } =
options;

return {
...rest,
workflowExecutionTimeout: msOptionalToTs(workflowExecutionTimeout),
workflowRunTimeout: msOptionalToTs(workflowRunTimeout),
workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout),
startDelay: msOptionalToTs(startDelay),
versioningOverride: versioningOverrideToProto(versioningOverride),
};
}

Expand Down Expand Up @@ -109,3 +125,19 @@ export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]
* Options for starting a Workflow
*/
export type WorkflowStartOptions<T extends Workflow = Workflow> = WithWorkflowArgs<T, WorkflowOptions>;

function versioningOverrideToProto(
vo: VersioningOverride | undefined
): temporal.api.workflow.v1.IVersioningOverride | undefined {
if (!vo) return undefined;
// TODO: Remove deprecated field assignments when versioning is non-experimental
if (vo === 'AUTO_UPGRADE') {
return {
behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE,
};
}
return {
behavior: temporal.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED,
pinnedVersion: toCanonicalString(vo.version),
};
}
2 changes: 2 additions & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export * from './logger';
export * from './priority';
export * from './retry-policy';
export type { Timestamp, Duration, StringValue } from './time';
export * from './worker-deployments';
export * from './workflow-definition-options';
export * from './workflow-handle';
export * from './workflow-options';
export * from './versioning-intent';
Expand Down
59 changes: 59 additions & 0 deletions packages/common/src/worker-deployments.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import type { temporal } from '@temporalio/proto';
import { makeProtoEnumConverters } from './internal-workflow';

/**
* Represents the version of a specific worker deployment.
*
* @experimental Deployment based versioning is experimental and may change in the future.
*/
export interface WorkerDeploymentVersion {
readonly buildId: string;
readonly deploymentName: string;
}

/**
* @returns The canonical representation of a deployment version, which is a string in the format
* `deploymentName.buildId`.
*/
export function toCanonicalString(version: WorkerDeploymentVersion): string {
return `${version.deploymentName}.${version.buildId}`;
}

/**
* Specifies when a workflow might move from a worker of one Build Id to another.
*
* * 'PINNED' - The workflow will be pinned to the current Build ID unless manually moved.
* * 'AUTO_UPGRADE' - The workflow will automatically move to the latest version (default Build ID
* of the task queue) when the next task is dispatched.
*
* @experimental Deployment based versioning is experimental and may change in the future.
*/
export const VersioningBehavior = {
PINNED: 'PINNED',
AUTO_UPGRADE: 'AUTO_UPGRADE',
} as const;
export type VersioningBehavior = (typeof VersioningBehavior)[keyof typeof VersioningBehavior];

export const [encodeVersioningBehavior, decodeVersioningBehavior] = makeProtoEnumConverters<
temporal.api.enums.v1.VersioningBehavior,
typeof temporal.api.enums.v1.VersioningBehavior,
keyof typeof temporal.api.enums.v1.VersioningBehavior,
typeof VersioningBehavior,
'VERSIONING_BEHAVIOR_'
>(
{
[VersioningBehavior.PINNED]: 1,
[VersioningBehavior.AUTO_UPGRADE]: 2,
UNSPECIFIED: 0,
} as const,
'VERSIONING_BEHAVIOR_'
);

/**
* Represents versioning overrides. For example, when starting workflows.
*
* If set to 'AUTO_UPGRADE', the Workflow will run as if it is using {@link VersioningBehavior.AUTO_UPGRADE}.
* Otherwise, you select a pinned behavior, and a specific version to pin to. Currently only one
* such behavior exists, but more will be added in the future.
*/
export type VersioningOverride = { pinned_behavior: 'PINNED'; version: WorkerDeploymentVersion } | 'AUTO_UPGRADE';
18 changes: 18 additions & 0 deletions packages/common/src/workflow-definition-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { VersioningBehavior } from './worker-deployments';

/**
* Options that can be used when defining a workflow via {@link setWorkflowOptions}.
*/
export interface WorkflowDefinitionOptions {
versioningBehavior?: VersioningBehavior;
}

type AsyncFunction<Args extends any[], ReturnType> = (...args: Args) => Promise<ReturnType>;
export type WorkflowDefinitionOptionsOrGetter = WorkflowDefinitionOptions | (() => WorkflowDefinitionOptions);

/**
* A workflow function that has been defined with options from {@link WorkflowDefinitionOptions}.
*/
export interface WorkflowFunctionWithOptions<Args extends any[], ReturnType> extends AsyncFunction<Args, ReturnType> {
workflowDefinitionOptions: WorkflowDefinitionOptionsOrGetter;
}
11 changes: 10 additions & 1 deletion packages/common/src/workflow-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Duration } from './time';
import { makeProtoEnumConverters } from './internal-workflow';
import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes';
import { Priority } from './priority';
import { WorkflowFunctionWithOptions } from './workflow-definition-options';

/**
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
Expand Down Expand Up @@ -243,7 +244,9 @@ export interface WorkflowDurationOptions {

export type CommonWorkflowOptions = BaseWorkflowOptions & WorkflowDurationOptions;

export function extractWorkflowType<T extends Workflow>(workflowTypeOrFunc: string | T): string {
export function extractWorkflowType<T extends Workflow>(
workflowTypeOrFunc: string | T | WorkflowFunctionWithOptions<any[], any>
): string {
if (typeof workflowTypeOrFunc === 'string') return workflowTypeOrFunc as string;
if (typeof workflowTypeOrFunc === 'function') {
if (workflowTypeOrFunc?.name) return workflowTypeOrFunc.name;
Expand All @@ -253,3 +256,9 @@ export function extractWorkflowType<T extends Workflow>(workflowTypeOrFunc: stri
`Invalid workflow type: expected either a string or a function, got '${typeof workflowTypeOrFunc}'`
);
}

/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
export function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions<any[], any> {
if (obj == null) return false;
return Object.hasOwn(obj, 'workflowDefinitionOptions');
}
3 changes: 3 additions & 0 deletions packages/core-bridge/bridge-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use proc_macro::TokenStream;
use syn::{DeriveInput, parse_macro_input};

/// Procedural macro for defining bridge types with compile-time field name conversion
///
/// Note that enum types must all be defined on the JS side as objects with a `type` field which
/// is the kebab-case representation of the enum variant.
#[proc_macro_derive(TryFromJs)]
pub fn try_from_js(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
Expand Down
Loading
Loading