Skip to content

Commit 58df441

Browse files
authored
feat: Priority annotations (#1669)
1 parent 0097ccf commit 58df441

22 files changed

+192
-4
lines changed

.github/workflows/ci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919

2020
# Use these variables to force specific version of CLI/Time Skipping Server for SDK tests
2121
# TESTS_CLI_VERSION: 'v0.13.2'
22+
TESTS_CLI_VERSION: 'v1.3.1-persistence-fix.0'
2223
# TESTS_TIME_SKIPPING_SERVER_VERSION: 'v1.24.1'
2324

2425
jobs:

.github/workflows/release.yml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ env:
1414

1515
# Use these variables to force specific version of CLI/Time Skipping Server for SDK tests
1616
# TESTS_CLI_VERSION: 'v0.13.2'
17+
TESTS_CLI_VERSION: 'v1.3.1-persistence-fix.0'
1718
# TESTS_TIME_SKIPPING_SERVER_VERSION: 'v1.24.1'
1819

1920
jobs:

packages/activity/src/index.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
*/
7171

7272
import { AsyncLocalStorage } from 'node:async_hooks';
73-
import { Logger, Duration, LogLevel, LogMetadata } from '@temporalio/common';
73+
import { Logger, Duration, LogLevel, LogMetadata, Priority } from '@temporalio/common';
7474
import { msToNumber } from '@temporalio/common/lib/time';
7575
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
7676

@@ -198,6 +198,10 @@ export interface Info {
198198
* For Local Activities, this is set to the Workflow's Task Queue.
199199
*/
200200
readonly taskQueue: string;
201+
/**
202+
* Priority of this activity
203+
*/
204+
readonly priority?: Priority;
201205
}
202206

203207
/**

packages/client/src/helpers.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ServiceError as GrpcServiceError, status as grpcStatus } from '@grpc/grpc-js';
2-
import { LoadedDataConverter, NamespaceNotFoundError } from '@temporalio/common';
2+
import { decodePriority, LoadedDataConverter, NamespaceNotFoundError } from '@temporalio/common';
33
import {
44
decodeSearchAttributes,
55
decodeTypedSearchAttributes,
@@ -79,6 +79,7 @@ export async function executionInfoFromRaw<T>(
7979
}
8080
: undefined,
8181
raw: rawDataToEmbed,
82+
priority: decodePriority(raw.priority),
8283
};
8384
}
8485

packages/client/src/schedule-helpers.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import Long from 'long'; // eslint-disable-line import/no-named-as-default
2-
import { compileRetryPolicy, decompileRetryPolicy, extractWorkflowType, LoadedDataConverter } from '@temporalio/common';
2+
import {
3+
compilePriority,
4+
compileRetryPolicy,
5+
decodePriority,
6+
decompileRetryPolicy,
7+
extractWorkflowType,
8+
LoadedDataConverter,
9+
} from '@temporalio/common';
310
import {
411
encodeUnifiedSearchAttributes,
512
decodeSearchAttributes,
@@ -263,6 +270,7 @@ export async function encodeScheduleAction(
263270
}
264271
: undefined,
265272
header: { fields: headers },
273+
priority: action.priority ? compilePriority(action.priority) : undefined,
266274
},
267275
};
268276
}
@@ -328,6 +336,7 @@ export async function decodeScheduleAction(
328336
workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout),
329337
workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout),
330338
workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout),
339+
priority: decodePriority(pb.startWorkflow.priority),
331340
};
332341
}
333342
throw new TypeError('Unsupported schedule action');

packages/client/src/schedule-types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ export type ScheduleDescriptionStartWorkflowAction = ScheduleSummaryStartWorkflo
815815
| 'workflowExecutionTimeout'
816816
| 'workflowRunTimeout'
817817
| 'workflowTaskTimeout'
818+
| 'priority'
818819
>;
819820

820821
// Invariant: an existing ScheduleDescriptionAction can be used as is to create or update a schedule

packages/client/src/types.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type * as grpc from '@grpc/grpc-js';
2-
import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue } from '@temporalio/common';
2+
import type { TypedSearchAttributes, SearchAttributes, SearchAttributeValue, Priority } from '@temporalio/common';
33
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
44
import * as proto from '@temporalio/proto';
55
import { Replace } from '@temporalio/common/lib/type-helpers';
@@ -52,6 +52,7 @@ export interface WorkflowExecutionInfo {
5252
typedSearchAttributes: TypedSearchAttributes;
5353
parentExecution?: Required<proto.temporal.api.common.v1.IWorkflowExecution>;
5454
raw: RawWorkflowExecutionInfo;
55+
priority?: Priority;
5556
}
5657

5758
export interface CountWorkflowExecution {

packages/client/src/workflow-client.ts

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
decodeRetryState,
2323
encodeWorkflowIdConflictPolicy,
2424
WorkflowIdConflictPolicy,
25+
compilePriority,
2526
} from '@temporalio/common';
2627
import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
2728
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
@@ -1225,6 +1226,7 @@ export class WorkflowClient extends BaseClient {
12251226
: undefined,
12261227
cronSchedule: options.cronSchedule,
12271228
header: { fields: headers },
1229+
priority: options.priority ? compilePriority(options.priority) : undefined,
12281230
};
12291231
try {
12301232
return (await this.workflowService.signalWithStartWorkflowExecution(req)).runId;
@@ -1293,6 +1295,7 @@ export class WorkflowClient extends BaseClient {
12931295
: undefined,
12941296
cronSchedule: opts.cronSchedule,
12951297
header: { fields: headers },
1298+
priority: opts.priority ? compilePriority(opts.priority) : undefined,
12961299
};
12971300
}
12981301

packages/common/src/activity-options.ts

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { RetryPolicy } from './retry-policy';
33
import { Duration } from './time';
44
import { VersioningIntent } from './versioning-intent';
55
import { makeProtoEnumConverters } from './internal-workflow';
6+
import { Priority } from './priority';
67

78
export const ActivityCancellationType = {
89
TRY_CANCEL: 'TRY_CANCEL',
@@ -122,6 +123,11 @@ export interface ActivityOptions {
122123
* @experimental The Worker Versioning API is still being designed. Major changes are expected.
123124
*/
124125
versioningIntent?: VersioningIntent;
126+
127+
/**
128+
* Priority of this activity
129+
*/
130+
priority?: Priority;
125131
}
126132

127133
/**

packages/common/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export * from './failure';
1919
export { Headers, Next } from './interceptors';
2020
export * from './interfaces';
2121
export * from './logger';
22+
export * from './priority';
2223
export * from './retry-policy';
2324
export type { Timestamp, Duration, StringValue } from './time';
2425
export * from './workflow-handle';

packages/common/src/priority.ts

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import type { temporal } from '@temporalio/proto';
2+
3+
/**
4+
* Priority contains metadata that controls relative ordering of task processing when tasks are
5+
* backlogged in a queue. Initially, Priority will be used in activity and workflow task queues,
6+
* which are typically where backlogs exist.
7+
* Priority is (for now) attached to workflows and activities. Activities and child workflows
8+
* inherit Priority from the workflow that created them, but may override fields when they are
9+
* started or modified. For each field of a Priority on an activity/workflow, not present or equal
10+
* to zero/empty string means to inherit the value from the calling workflow, or if there is no
11+
* calling workflow, then use the default (documented on the field).
12+
* The overall semantics of Priority are:
13+
* 1. First, consider "priority_key": lower number goes first.
14+
* (more will be added here later)
15+
*/
16+
export interface Priority {
17+
/**
18+
* Priority key is a positive integer from 1 to n, where smaller integers
19+
* correspond to higher priorities (tasks run sooner). In general, tasks in
20+
* a queue should be processed in close to priority order, although small
21+
* deviations are possible.
22+
*
23+
* The maximum priority value (minimum priority) is determined by server configuration, and
24+
* defaults to 5.
25+
*
26+
* The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes out to 3.
27+
*/
28+
priorityKey?: number;
29+
}
30+
31+
/**
32+
* Turn a proto compatible Priority into a TS Priority
33+
*/
34+
export function decodePriority(priority?: temporal.api.common.v1.IPriority | null): Priority {
35+
return { priorityKey: priority?.priorityKey ?? undefined };
36+
}
37+
38+
/**
39+
* Turn a TS Priority into a proto compatible Priority
40+
*/
41+
export function compilePriority(priority: Priority): temporal.api.common.v1.IPriority {
42+
if (priority.priorityKey !== undefined && priority.priorityKey !== null) {
43+
if (!Number.isInteger(priority.priorityKey)) {
44+
throw new TypeError('priorityKey must be an integer');
45+
}
46+
if (priority.priorityKey < 0) {
47+
throw new RangeError('priorityKey must be a positive integer');
48+
}
49+
}
50+
51+
return {
52+
priorityKey: priority.priorityKey ?? 0,
53+
};
54+
}

packages/common/src/workflow-options.ts

+6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { RetryPolicy } from './retry-policy';
44
import { Duration } from './time';
55
import { makeProtoEnumConverters } from './internal-workflow';
66
import { SearchAttributePair, SearchAttributes, TypedSearchAttributes } from './search-attributes';
7+
import { Priority } from './priority';
78

89
/**
910
* Defines what happens when trying to start a Workflow with the same ID as a *Closed* Workflow.
@@ -190,6 +191,11 @@ export interface BaseWorkflowOptions {
190191
* by {@link typedSearchAttributes}.
191192
*/
192193
typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes;
194+
195+
/**
196+
* Priority of a workflow
197+
*/
198+
priority?: Priority;
193199
}
194200

195201
export type WithWorkflowArgs<W extends Workflow, T> = T &

packages/test/src/helpers.ts

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export const RUN_TIME_SKIPPING_TESTS =
3939
inWorkflowContext() || !(process.platform === 'linux' && process.arch === 'arm64');
4040

4141
export const TESTS_CLI_VERSION = inWorkflowContext() ? '' : process.env.TESTS_CLI_VERSION;
42+
4243
export const TESTS_TIME_SKIPPING_SERVER_VERSION = inWorkflowContext()
4344
? ''
4445
: process.env.TESTS_TIME_SKIPPING_SERVER_VERSION;

packages/test/src/test-integration-split-one.ts

+1
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ test('Workflow can read WorkflowInfo', configMacro, async (t, config) => {
736736
currentBuildId: res.currentBuildId,
737737
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
738738
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
739+
priority: {},
739740
});
740741
});
741742

packages/test/src/test-integration-split-three.ts

+54
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import v8 from 'node:v8';
33
import { readFileSync } from 'node:fs';
44
import pkg from '@temporalio/worker/lib/pkg';
55
import { bundleWorkflowCode } from '@temporalio/worker';
6+
import { temporal } from '@temporalio/proto';
67
import { configMacro, makeTestFn } from './helpers-integration-multi-codec';
78
import { configurableHelpers } from './helpers-integration';
89
import { withZeroesHTTPServer } from './zeroes-http-server';
@@ -140,3 +141,56 @@ if ('promiseHooks' in v8) {
140141
t.deepEqual(Object.entries(enhancedStack.sources), expectedSources);
141142
});
142143
}
144+
145+
test(
146+
'priorities can be specified and propagated across child workflows and activities',
147+
configMacro,
148+
async (t, config) => {
149+
const { env, createWorkerWithDefaults } = config;
150+
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
151+
const worker = await createWorkerWithDefaults(t, { activities });
152+
const handle = await startWorkflow(workflows.priorityWorkflow, {
153+
args: [false, 1],
154+
priority: { priorityKey: 1 },
155+
});
156+
await worker.runUntil(handle.result());
157+
let firstChild = true;
158+
const history = await handle.fetchHistory();
159+
console.log('events');
160+
for (const event of history?.events ?? []) {
161+
switch (event.eventType) {
162+
case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
163+
t.deepEqual(event.workflowExecutionStartedEventAttributes?.priority?.priorityKey, 1);
164+
break;
165+
case temporal.api.enums.v1.EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED: {
166+
const pri = event.startChildWorkflowExecutionInitiatedEventAttributes?.priority?.priorityKey;
167+
if (firstChild) {
168+
t.deepEqual(pri, 4);
169+
firstChild = false;
170+
} else {
171+
t.deepEqual(pri, 2);
172+
}
173+
break;
174+
}
175+
case temporal.api.enums.v1.EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
176+
t.deepEqual(event.activityTaskScheduledEventAttributes?.priority?.priorityKey, 5);
177+
break;
178+
}
179+
}
180+
}
181+
);
182+
183+
test('workflow start without priorities sees undefined for the key', configMacro, async (t, config) => {
184+
const { env, createWorkerWithDefaults } = config;
185+
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
186+
const worker = await createWorkerWithDefaults(t, { activities });
187+
console.log('STARTING WORKFLOW');
188+
189+
const handle1 = await startWorkflow(workflows.priorityWorkflow, {
190+
args: [true, undefined],
191+
});
192+
await worker.runUntil(handle1.result());
193+
194+
// check occurs in the workflow, need an assert in the test itself in order to run
195+
t.true(true);
196+
});

packages/test/src/test-sinks.ts

+3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ if (RUN_INTEGRATION_TESTS) {
133133
unsafe: {
134134
isReplaying: false,
135135
} as UnsafeWorkflowInfo,
136+
priority: {
137+
priorityKey: undefined,
138+
},
136139
};
137140

138141
t.deepEqual(recordedCalls, [

packages/test/src/workflows/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export * from './noncancellable-shields-children';
5151
export * from './partial-noncancelable';
5252
export * from './patched';
5353
export * from './patched-top-level';
54+
export * from './priority';
5455
export * from './promise-all';
5556
export * from './promise-race';
5657
export * from './promise-then-promise';
+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { executeChild, proxyActivities, startChild, workflowInfo } from '@temporalio/workflow';
2+
import type * as activities from '../activities';
3+
4+
const { echo } = proxyActivities<typeof activities>({ startToCloseTimeout: '5s', priority: { priorityKey: 5 } });
5+
6+
export async function priorityWorkflow(stopAfterCheck: boolean, expectedPriority: number | undefined): Promise<void> {
7+
const info = workflowInfo();
8+
if (!info.priority) {
9+
throw new Error(`undefined priority`);
10+
}
11+
if (info.priority?.priorityKey !== expectedPriority) {
12+
throw new Error(
13+
`workflow priority ${info.priority?.priorityKey} doesn't match expected priority ${expectedPriority}`
14+
);
15+
}
16+
if (stopAfterCheck) {
17+
return;
18+
}
19+
20+
await executeChild(priorityWorkflow, { args: [true, 4], priority: { priorityKey: 4 } });
21+
22+
const child = await startChild(priorityWorkflow, { args: [true, 2], priority: { priorityKey: 2 } });
23+
await child.result();
24+
25+
await echo('hi');
26+
}

packages/testing/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ export const defaultActivityInfo: activity.Info = {
427427
startToCloseTimeoutMs: 1000,
428428
scheduleToCloseTimeoutMs: 1000,
429429
currentAttemptScheduledTimestampMs: 1,
430+
priority: undefined,
430431
};
431432

432433
/**

packages/worker/src/worker.ts

+4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
ApplicationFailure,
3333
ensureApplicationFailure,
3434
TypedSearchAttributes,
35+
decodePriority,
3536
} from '@temporalio/common';
3637
import {
3738
decodeArrayFromPayloads,
@@ -1268,6 +1269,7 @@ export class Worker {
12681269
cronSchedule,
12691270
workflowExecutionExpirationTime,
12701271
cronScheduleToScheduleInterval,
1272+
priority,
12711273
} = initWorkflowJob;
12721274

12731275
// Note that we can't do payload convertion here, as there's no guarantee that converted payloads would be safe to
@@ -1304,6 +1306,7 @@ export class Worker {
13041306
now: () => Date.now(), // re-set in initRuntime
13051307
isReplaying: activation.isReplaying,
13061308
},
1309+
priority: decodePriority(priority),
13071310
};
13081311
const logAttributes = workflowLogAttributes(workflowInfo);
13091312
this.logger.trace('Creating workflow', logAttributes);
@@ -1898,6 +1901,7 @@ async function extractActivityInfo(
18981901
start.currentAttemptScheduledTime,
18991902
'currentAttemptScheduledTime'
19001903
),
1904+
priority: decodePriority(start.priority),
19011905
};
19021906
}
19031907

0 commit comments

Comments
 (0)