Skip to content

feat: Priority annotations #1669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 18, 2025
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:

# Use these variables to force specific version of CLI/Time Skipping Server for SDK tests
# TESTS_CLI_VERSION: 'v0.13.2'
TESTS_CLI_VERSION: 'v1.3.1-persistence-fix.0'
# TESTS_TIME_SKIPPING_SERVER_VERSION: 'v1.24.1'

jobs:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ env:

# Use these variables to force specific version of CLI/Time Skipping Server for SDK tests
# TESTS_CLI_VERSION: 'v0.13.2'
TESTS_CLI_VERSION: 'v1.3.1-persistence-fix.0'
# TESTS_TIME_SKIPPING_SERVER_VERSION: 'v1.24.1'

jobs:
Expand Down
6 changes: 5 additions & 1 deletion packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
*/

import { AsyncLocalStorage } from 'node:async_hooks';
import { Logger, Duration, LogLevel, LogMetadata } from '@temporalio/common';
import { Logger, Duration, LogLevel, LogMetadata, Priority } from '@temporalio/common';
import { msToNumber } from '@temporalio/common/lib/time';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';

Expand Down Expand Up @@ -198,6 +198,10 @@ export interface Info {
* For Local Activities, this is set to the Workflow's Task Queue.
*/
readonly taskQueue: string;
/**
* Priority of this activity
*/
readonly priority?: Priority;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ServiceError as GrpcServiceError, status as grpcStatus } from '@grpc/grpc-js';
import { LoadedDataConverter, NamespaceNotFoundError } from '@temporalio/common';
import { decodePriority, LoadedDataConverter, NamespaceNotFoundError } from '@temporalio/common';
import {
decodeSearchAttributes,
decodeTypedSearchAttributes,
Expand Down Expand Up @@ -79,6 +79,7 @@ export async function executionInfoFromRaw<T>(
}
: undefined,
raw: rawDataToEmbed,
priority: decodePriority(raw.priority),
};
}

Expand Down
11 changes: 10 additions & 1 deletion packages/client/src/schedule-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import Long from 'long'; // eslint-disable-line import/no-named-as-default
import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, LoadedDataConverter } from '@temporalio/common';
import {
compilePriority,
compileRetryPolicy,
decodePriority,
decompileRetryPolicy,
extractWorkflowType,
LoadedDataConverter,
} from '@temporalio/common';
import {
encodeUnifiedSearchAttributes,
decodeSearchAttributes,
Expand Down Expand Up @@ -263,6 +270,7 @@ export async function encodeScheduleAction(
}
: undefined,
header: { fields: headers },
priority: action.priority ? compilePriority(action.priority) : undefined,
},
};
}
Expand Down Expand Up @@ -328,6 +336,7 @@ export async function decodeScheduleAction(
workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout),
workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout),
workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout),
priority: decodePriority(pb.startWorkflow.priority),
};
}
throw new TypeError('Unsupported schedule action');
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/schedule-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo
| 'workflowExecutionTimeout'
| 'workflowRunTimeout'
| 'workflowTaskTimeout'
| 'priority'
>;

// Invariant: an existing ScheduleDescriptionAction can be used as is to create or update a schedule
Expand Down
3 changes: 2 additions & 1 deletion packages/client/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type * as grpc from '@grpc/grpc-js';
import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue } from '@temporalio/common';
import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue, Priority } from '@temporalio/common';
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import * as proto from '@temporalio/proto';
import { Replace } from '@temporalio/common/lib/type-helpers';
Expand Down Expand Up @@ -52,6 +52,7 @@ export interface WorkflowExecutionInfo {
typedSearchAttributes: TypedSearchAttributes;
parentExecution?: Required<proto.temporal.api.common.v1.IWorkflowExecution>;
raw: RawWorkflowExecutionInfo;
priority?: Priority;
}

export interface CountWorkflowExecution {
Expand Down
3 changes: 3 additions & 0 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
decodeRetryState,
encodeWorkflowIdConflictPolicy,
WorkflowIdConflictPolicy,
compilePriority,
} from '@temporalio/common';
import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
Expand Down Expand Up @@ -1225,6 +1226,7 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: options.cronSchedule,
header: { fields: headers },
priority: options.priority ? compilePriority(options.priority) : undefined,
};
try {
return (await this.workflowService.signalWithStartWorkflowExecution(req)).runId;
Expand Down Expand Up @@ -1293,6 +1295,7 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: opts.cronSchedule,
header: { fields: headers },
priority: opts.priority ? compilePriority(opts.priority) : undefined,
};
}

Expand Down
6 changes: 6 additions & 0 deletions packages/common/src/activity-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { RetryPolicy } from './retry-policy';
import { Duration } from './time';
import { VersioningIntent } from './versioning-intent';
import { makeProtoEnumConverters } from './internal-workflow';
import { Priority } from './priority';

export const ActivityCancellationType = {
TRY_CANCEL: 'TRY_CANCEL',
Expand Down Expand Up @@ -122,6 +123,11 @@ export interface ActivityOptions {
* @experimental The Worker Versioning API is still being designed. Major changes are expected.
*/
versioningIntent?: VersioningIntent;

/**
* Priority of this activity
*/
priority?: Priority;
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './failure';
export { Headers, Next } from './interceptors';
export * from './interfaces';
export * from './logger';
export * from './priority';
export * from './retry-policy';
export type { Timestamp, Duration, StringValue } from './time';
export * from './workflow-handle';
Expand Down
54 changes: 54 additions & 0 deletions packages/common/src/priority.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { temporal } from '@temporalio/proto';

/**
* Priority contains metadata that controls relative ordering of task processing when tasks are
* backlogged in a queue. Initially, Priority will be used in activity and workflow task queues,
* which are typically where backlogs exist.
* Priority is (for now) attached to workflows and activities. Activities and child workflows
* inherit Priority from the workflow that created them, but may override fields when they are
* started or modified. For each field of a Priority on an activity/workflow, not present or equal
* to zero/empty string means to inherit the value from the calling workflow, or if there is no
* calling workflow, then use the default (documented on the field).
* The overall semantics of Priority are:
* 1. First, consider "priority_key": lower number goes first.
* (more will be added here later)
*/
export interface Priority {
/**
* Priority key is a positive integer from 1 to n, where smaller integers
* correspond to higher priorities (tasks run sooner). In general, tasks in
* a queue should be processed in close to priority order, although small
* deviations are possible.
*
* The maximum priority value (minimum priority) is determined by server configuration, and
* defaults to 5.
*
* The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes out to 3.
*/
priorityKey?: number;
}

/**
* Turn a proto compatible Priority into a TS Priority
*/
export function decodePriority(priority?: temporal.api.common.v1.IPriority | null): Priority {
return { priorityKey: priority?.priorityKey ?? undefined };
}

/**
* Turn a TS Priority into a proto compatible Priority
*/
export function compilePriority(priority: Priority): temporal.api.common.v1.IPriority {
if (priority.priorityKey !== undefined && priority.priorityKey !== null) {
if (!Number.isInteger(priority.priorityKey)) {
throw new TypeError('priorityKey must be an integer');
}
if (priority.priorityKey < 0) {
throw new RangeError('priorityKey must be a positive integer');
}
}

return {
priorityKey: priority.priorityKey ?? 0,
};
}
6 changes: 6 additions & 0 deletions packages/common/src/workflow-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { RetryPolicy } from './retry-policy';
import { Duration } from './time';
import { makeProtoEnumConverters } from './internal-workflow';
import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes';
import { Priority } from './priority';

/**
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
Expand Down Expand Up @@ -190,6 +191,11 @@ export interface BaseWorkflowOptions {
* by {@link typedSearchAttributes}.
*/
typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes;

/**
* Priority of a workflow
*/
priority?: Priority;
}

export type WithWorkflowArgs<W extends Workflow, T> = T &
Expand Down
1 change: 1 addition & 0 deletions packages/test/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const RUN_TIME_SKIPPING_TESTS =
inWorkflowContext() || !(process.platform === 'linux' && process.arch === 'arm64');

export const TESTS_CLI_VERSION = inWorkflowContext() ? '' : process.env.TESTS_CLI_VERSION;

export const TESTS_TIME_SKIPPING_SERVER_VERSION = inWorkflowContext()
? ''
: process.env.TESTS_TIME_SKIPPING_SERVER_VERSION;
Expand Down
1 change: 1 addition & 0 deletions packages/test/src/test-integration-split-one.ts
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ test('Workflow can read WorkflowInfo', configMacro, async (t, config) => {
currentBuildId: res.currentBuildId,
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
priority: {},
});
});

Expand Down
54 changes: 54 additions & 0 deletions packages/test/src/test-integration-split-three.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import v8 from 'node:v8';
import { readFileSync } from 'node:fs';
import pkg from '@temporalio/worker/lib/pkg';
import { bundleWorkflowCode } from '@temporalio/worker';
import { temporal } from '@temporalio/proto';
import { configMacro, makeTestFn } from './helpers-integration-multi-codec';
import { configurableHelpers } from './helpers-integration';
import { withZeroesHTTPServer } from './zeroes-http-server';
Expand Down Expand Up @@ -140,3 +141,56 @@ if ('promiseHooks' in v8) {
t.deepEqual(Object.entries(enhancedStack.sources), expectedSources);
});
}

test(
'priorities can be specified and propagated across child workflows and activities',
configMacro,
async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
const handle = await startWorkflow(workflows.priorityWorkflow, {
args: [false, 1],
priority: { priorityKey: 1 },
});
await worker.runUntil(handle.result());
let firstChild = true;
const history = await handle.fetchHistory();
console.log('events');
for (const event of history?.events ?? []) {
switch (event.eventType) {
case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
t.deepEqual(event.workflowExecutionStartedEventAttributes?.priority?.priorityKey, 1);
break;
case temporal.api.enums.v1.EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED: {
const pri = event.startChildWorkflowExecutionInitiatedEventAttributes?.priority?.priorityKey;
if (firstChild) {
t.deepEqual(pri, 4);
firstChild = false;
} else {
t.deepEqual(pri, 2);
}
break;
}
case temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
t.deepEqual(event.activityTaskScheduledEventAttributes?.priority?.priorityKey, 5);
break;
}
}
}
);

test('workflow start without priorities sees undefined for the key', configMacro, async (t, config) => {
const { env, createWorkerWithDefaults } = config;
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
const worker = await createWorkerWithDefaults(t, { activities });
console.log('STARTING WORKFLOW');

const handle1 = await startWorkflow(workflows.priorityWorkflow, {
args: [true, undefined],
});
await worker.runUntil(handle1.result());

// check occurs in the workflow, need an assert in the test itself in order to run
t.true(true);
});
3 changes: 3 additions & 0 deletions packages/test/src/test-sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ if (RUN_INTEGRATION_TESTS) {
unsafe: {
isReplaying: false,
} as UnsafeWorkflowInfo,
priority: {
priorityKey: undefined,
},
};

t.deepEqual(recordedCalls, [
Expand Down
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export * from './noncancellable-shields-children';
export * from './partial-noncancelable';
export * from './patched';
export * from './patched-top-level';
export * from './priority';
export * from './promise-all';
export * from './promise-race';
export * from './promise-then-promise';
Expand Down
26 changes: 26 additions & 0 deletions packages/test/src/workflows/priority.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { executeChild, proxyActivities, startChild, workflowInfo } from '@temporalio/workflow';
import type * as activities from '../activities';

const { echo } = proxyActivities<typeof activities>({ startToCloseTimeout: '5s', priority: { priorityKey: 5 } });

export async function priorityWorkflow(stopAfterCheck: boolean, expectedPriority: number | undefined): Promise<void> {
const info = workflowInfo();
if (!info.priority) {
throw new Error(`undefined priority`);
}
if (info.priority?.priorityKey !== expectedPriority) {
throw new Error(
`workflow priority ${info.priority?.priorityKey} doesn't match expected priority ${expectedPriority}`
);
}
if (stopAfterCheck) {
return;
}

await executeChild(priorityWorkflow, { args: [true, 4], priority: { priorityKey: 4 } });

const child = await startChild(priorityWorkflow, { args: [true, 2], priority: { priorityKey: 2 } });
await child.result();

await echo('hi');
}
1 change: 1 addition & 0 deletions packages/testing/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ export const defaultActivityInfo: activity.Info = {
startToCloseTimeoutMs: 1000,
scheduleToCloseTimeoutMs: 1000,
currentAttemptScheduledTimestampMs: 1,
priority: undefined,
};

/**
Expand Down
4 changes: 4 additions & 0 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
ApplicationFailure,
ensureApplicationFailure,
TypedSearchAttributes,
decodePriority,
} from '@temporalio/common';
import {
decodeArrayFromPayloads,
Expand Down Expand Up @@ -1268,6 +1269,7 @@ export class Worker {
cronSchedule,
workflowExecutionExpirationTime,
cronScheduleToScheduleInterval,
priority,
} = initWorkflowJob;

// Note that we can't do payload convertion here, as there's no guarantee that converted payloads would be safe to
Expand Down Expand Up @@ -1304,6 +1306,7 @@ export class Worker {
now: () => Date.now(), // re-set in initRuntime
isReplaying: activation.isReplaying,
},
priority: decodePriority(priority),
};
const logAttributes = workflowLogAttributes(workflowInfo);
this.logger.trace('Creating workflow', logAttributes);
Expand Down Expand Up @@ -1898,6 +1901,7 @@ async function extractActivityInfo(
start.currentAttemptScheduledTime,
'currentAttemptScheduledTime'
),
priority: decodePriority(start.priority),
};
}

Expand Down
Loading
Loading