Skip to content

Commit 5f5103a

Browse files
committed
init impl for special behaviour for temporal prefixes. Default signal test needs to be fixed, need to add behaviour reserving prefixes from workflows, and waiting for default update to be merged to add behaviour preventing default update handler to be called with reserved names
1 parent 4155dad commit 5f5103a

File tree

9 files changed

+262
-9
lines changed

9 files changed

+262
-9
lines changed

packages/common/src/reserved.ts

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
export const TEMPORAL_RESERVED_PREFIX = '__temporal_';
2+
export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace';
3+
export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace';
4+
5+
export const reservedPrefixes = [
6+
TEMPORAL_RESERVED_PREFIX,
7+
STACK_TRACE_RESERVED_PREFIX,
8+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
9+
];
10+
11+
export function throwIfReservedName(type: string, name: string): void {
12+
const prefix = isReservedName(name);
13+
if (prefix) {
14+
throw Error(`Cannot register ${type} name: '${name}', with reserved prefix: '${prefix}'`);
15+
}
16+
}
17+
18+
export function isReservedName(name: string): string | undefined {
19+
for (const prefix of reservedPrefixes) {
20+
if (name.startsWith(prefix)) {
21+
return prefix;
22+
}
23+
}
24+
}

packages/test/src/test-integration-split-two.ts

+97-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,16 @@ import {
1414
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
1515
import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow';
1616

17-
import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow';
17+
import {
18+
condition,
19+
defineQuery,
20+
defineSignal,
21+
defineUpdate,
22+
setDefaultQueryHandler,
23+
setHandler,
24+
sleep,
25+
} from '@temporalio/workflow';
26+
import { reservedPrefixes } from '@temporalio/common/lib/reserved';
1827
import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration';
1928
import * as activities from './activities';
2029
import * as workflows from './workflows';
@@ -751,3 +760,90 @@ test('default query handler is not used if requested query exists', configMacro,
751760
t.deepEqual(result, { name: definedQuery.name, args });
752761
});
753762
});
763+
764+
test('Cannot register activities using reserved prefixes', configMacro, async (t, config) => {
765+
const { createWorkerWithDefaults } = config;
766+
767+
for (const prefix of reservedPrefixes) {
768+
const activityName = prefix + '_test';
769+
await t.throwsAsync(
770+
createWorkerWithDefaults(t, {
771+
activities: { [activityName]: () => {} },
772+
}),
773+
{
774+
instanceOf: Error,
775+
message: `Cannot register activity name: '${activityName}', with reserved prefix: '${prefix}'`,
776+
}
777+
);
778+
}
779+
});
780+
781+
test('Cannot register task queues using reserved prefixes', configMacro, async (t, config) => {
782+
const { createWorkerWithDefaults } = config;
783+
784+
for (const prefix of reservedPrefixes) {
785+
const taskQueue = prefix + '_test';
786+
787+
await t.throwsAsync(
788+
createWorkerWithDefaults(t, {
789+
taskQueue,
790+
}),
791+
{
792+
instanceOf: Error,
793+
message: `Cannot register task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`,
794+
}
795+
);
796+
}
797+
});
798+
799+
interface HandlerError {
800+
name: string;
801+
message: string;
802+
}
803+
804+
export async function workflowBadPrefixHandler(prefix: string): Promise<HandlerError[]> {
805+
// Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
806+
const expectedErrors: HandlerError[] = [];
807+
try {
808+
setHandler(defineSignal(prefix + '_signal'), () => {});
809+
} catch (e) {
810+
if (e instanceof Error) {
811+
expectedErrors.push({ name: e.name, message: e.message });
812+
}
813+
}
814+
try {
815+
setHandler(defineUpdate(prefix + '_update'), () => {});
816+
} catch (e) {
817+
if (e instanceof Error) {
818+
expectedErrors.push({ name: e.name, message: e.message });
819+
}
820+
}
821+
try {
822+
setHandler(defineQuery(prefix + '_query'), () => {});
823+
} catch (e) {
824+
if (e instanceof Error) {
825+
expectedErrors.push({ name: e.name, message: e.message });
826+
}
827+
}
828+
return expectedErrors;
829+
}
830+
831+
test('Workflow failure if define signals/updates/queries with reserved prefixes', configMacro, async (t, config) => {
832+
const { env, createWorkerWithDefaults } = config;
833+
const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
834+
const worker = await createWorkerWithDefaults(t);
835+
await worker.runUntil(async () => {
836+
const prefix = reservedPrefixes[0];
837+
// for (const prefix of reservedPrefixes) {
838+
const result = await executeWorkflow(workflowBadPrefixHandler, {
839+
args: [prefix],
840+
});
841+
console.log('result', result);
842+
t.deepEqual(result, [
843+
{ name: 'Error', message: `Cannot register signal name: '${prefix}_signal', with reserved prefix: '${prefix}'` },
844+
{ name: 'Error', message: `Cannot register update name: '${prefix}_update', with reserved prefix: '${prefix}'` },
845+
{ name: 'Error', message: `Cannot register query name: '${prefix}_query', with reserved prefix: '${prefix}'` },
846+
]);
847+
// }
848+
});
849+
});

packages/test/src/test-workflows.ts

+86
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/v
2020
import { SdkFlag, SdkFlags } from '@temporalio/workflow/lib/flags';
2121
import { ReusableVMWorkflow, ReusableVMWorkflowCreator } from '@temporalio/worker/lib/workflow/reusable-vm';
2222
import { parseWorkflowCode } from '@temporalio/worker/lib/worker';
23+
import {
24+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
25+
reservedPrefixes,
26+
STACK_TRACE_RESERVED_PREFIX,
27+
TEMPORAL_RESERVED_PREFIX,
28+
} from '@temporalio/common/lib/reserved';
2329
import * as activityFunctions from './activities';
2430
import { cleanStackTrace, REUSE_V8_CONTEXT, u8 } from './helpers';
2531
import { ProcessedSignal } from './workflows';
@@ -2528,3 +2534,83 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11
25282534
);
25292535
}
25302536
});
2537+
2538+
test('Default query handler fail activations with reserved names - workflowWithDefaultHandlers', async (t) => {
2539+
const { workflowType } = t.context;
2540+
2541+
await activate(t, makeActivation(undefined, makeInitializeWorkflowJob(workflowType)));
2542+
2543+
for (const prefix of reservedPrefixes) {
2544+
const completion = await activate(t, makeActivation(undefined, makeQueryWorkflowJob('1', prefix + '_query')));
2545+
2546+
compareCompletion(t, completion, {
2547+
failed: {
2548+
failure: {
2549+
...completion.failed?.failure,
2550+
// We only care about the error message.
2551+
message: `Cannot register query name: '${prefix}_query', with reserved prefix: '${prefix}'`,
2552+
},
2553+
},
2554+
});
2555+
}
2556+
});
2557+
2558+
test('Default signal handler fail activations with temporal prefix - workflowWithDefaultHandlers', async (t) => {
2559+
const { workflowType } = t.context;
2560+
2561+
await activate(t, makeActivation(undefined, makeInitializeWorkflowJob(workflowType)));
2562+
const signalName = TEMPORAL_RESERVED_PREFIX + '_signal';
2563+
const job = makeSignalWorkflowJob(signalName, []);
2564+
2565+
const completion = await activate(t, makeActivation(undefined, job));
2566+
2567+
compareCompletion(t, completion, {
2568+
failed: {
2569+
failure: {
2570+
...completion.failed?.failure,
2571+
// We only care about the error message.
2572+
message: `Cannot register signal name: '${signalName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`,
2573+
},
2574+
},
2575+
});
2576+
});
2577+
2578+
test('Default signal handler fail activations with stack trace prefix - workflowWithDefaultHandlers', async (t) => {
2579+
const { workflowType } = t.context;
2580+
2581+
await activate(t, makeActivation(undefined, makeInitializeWorkflowJob(workflowType)));
2582+
const signalName = STACK_TRACE_RESERVED_PREFIX + '_signal';
2583+
const job = makeSignalWorkflowJob(signalName, []);
2584+
2585+
const completion = await activate(t, makeActivation(undefined, job));
2586+
2587+
compareCompletion(t, completion, {
2588+
failed: {
2589+
failure: {
2590+
...completion.failed?.failure,
2591+
// We only care about the error message.
2592+
message: `Cannot register signal name: '${signalName}', with reserved prefix: '${STACK_TRACE_RESERVED_PREFIX}'`,
2593+
},
2594+
},
2595+
});
2596+
});
2597+
2598+
test('Default signal handler fail activations with enhanced stack trace prefix - workflowWithDefaultHandlers', async (t) => {
2599+
const { workflowType } = t.context;
2600+
2601+
await activate(t, makeActivation(undefined, makeInitializeWorkflowJob(workflowType)));
2602+
const signalName = ENHANCED_STACK_TRACE_RESERVED_PREFIX + '_signal';
2603+
const job = makeSignalWorkflowJob(signalName, []);
2604+
2605+
const completion = await activate(t, makeActivation(undefined, job));
2606+
2607+
compareCompletion(t, completion, {
2608+
failed: {
2609+
failure: {
2610+
...completion.failed?.failure,
2611+
// We only care about the error message.
2612+
message: `Cannot register signal name: '${signalName}', with reserved prefix: '${ENHANCED_STACK_TRACE_RESERVED_PREFIX}'`,
2613+
},
2614+
},
2615+
});
2616+
});

packages/test/src/workflows/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,4 @@ export * from './upsert-and-read-search-attributes';
8989
export * from './wait-on-user';
9090
export * from './workflow-cancellation-scenarios';
9191
export * from './upsert-and-read-memo';
92+
export * from './workflow-with-default-handlers';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import {
2+
condition,
3+
defineSignal,
4+
setDefaultQueryHandler,
5+
setDefaultSignalHandler,
6+
setHandler,
7+
} from '@temporalio/workflow';
8+
9+
export async function workflowWithDefaultHandlers(): Promise<void> {
10+
const complete = true;
11+
setDefaultQueryHandler(() => {});
12+
setDefaultSignalHandler(() => {});
13+
setHandler(defineSignal('completeSignal'), () => {});
14+
15+
await condition(() => complete);
16+
}

packages/worker/src/worker-options.ts

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { LoggerSinks } from '@temporalio/workflow';
88
import { Context } from '@temporalio/activity';
99
import { checkExtends } from '@temporalio/common/lib/type-helpers';
1010
import { WorkerOptions as NativeWorkerOptions, WorkerTuner as NativeWorkerTuner } from '@temporalio/core-bridge';
11+
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
1112
import { ActivityInboundLogInterceptor } from './activity-log-interceptor';
1213
import { NativeConnection } from './connection';
1314
import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors';
@@ -822,6 +823,9 @@ export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): Co
822823
}
823824

824825
const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function'));
826+
for (const activityName of activities.keys()) {
827+
throwIfReservedName('activity', activityName);
828+
}
825829
const tuner = asNativeTuner(opts.tuner, logger);
826830

827831
return {

packages/worker/src/worker.ts

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import * as native from '@temporalio/core-bridge';
5555
import { ShutdownError, UnexpectedError } from '@temporalio/core-bridge';
5656
import { coresdk, temporal } from '@temporalio/proto';
5757
import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow';
58+
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
5859
import { Activity, CancelReason, activityLogAttributes } from './activity';
5960
import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection';
6061
import { ActivityExecuteInput } from './interceptors';
@@ -462,6 +463,7 @@ export class Worker {
462463
* This method initiates a connection to the server and will throw (asynchronously) on connection failure.
463464
*/
464465
public static async create(options: WorkerOptions): Promise<Worker> {
466+
throwIfReservedName('task queue', options.taskQueue);
465467
const logger = withMetadata(Runtime.instance().logger, {
466468
sdkComponent: SdkComponent.worker,
467469
taskQueue: options.taskQueue ?? 'default',

packages/workflow/src/internals.ts

+29-8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ import {
2626
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
2727
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
2828
import type { coresdk, temporal } from '@temporalio/proto';
29+
import {
30+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
31+
STACK_TRACE_RESERVED_PREFIX,
32+
isReservedName,
33+
throwIfReservedName,
34+
} from '@temporalio/common/lib/reserved';
2935
import { alea, RNG } from './alea';
3036
import { RootCancellationScope } from './cancellation-scope';
3137
import { UpdateScope } from './update-scope';
@@ -249,7 +255,7 @@ export class Activator implements ActivationHandler {
249255
*/
250256
public readonly queryHandlers = new Map<string, WorkflowQueryAnnotatedType>([
251257
[
252-
'__stack_trace',
258+
STACK_TRACE_RESERVED_PREFIX,
253259
{
254260
handler: () => {
255261
return this.getStackTraces()
@@ -260,7 +266,7 @@ export class Activator implements ActivationHandler {
260266
},
261267
],
262268
[
263-
'__enhanced_stack_trace',
269+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
264270
{
265271
handler: (): EnhancedStackTrace => {
266272
const { sourceMap } = this;
@@ -619,6 +625,8 @@ export class Activator implements ActivationHandler {
619625
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
620626
let fn = this.queryHandlers.get(queryName)?.handler;
621627
if (fn === undefined && this.defaultQueryHandler !== undefined) {
628+
// Do not call default query handler with reserved query name.
629+
throwIfReservedName('query', queryName);
622630
fn = this.defaultQueryHandler.bind(this, queryName);
623631
}
624632
// No handler or default registered, fail.
@@ -649,17 +657,28 @@ export class Activator implements ActivationHandler {
649657
throw new TypeError('Missing query activation attributes');
650658
}
651659

660+
const queryInput = {
661+
queryName: queryType,
662+
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
663+
queryId,
664+
headers: headers ?? {},
665+
};
666+
667+
// Skip interceptors if this is an internal query.
668+
if (isReservedName(queryType)) {
669+
this.queryWorkflowNextHandler(queryInput).then(
670+
(result) => this.completeQuery(queryId, result),
671+
(reason) => this.failQuery(queryId, reason)
672+
);
673+
return;
674+
}
675+
652676
const execute = composeInterceptors(
653677
this.interceptors.inbound,
654678
'handleQuery',
655679
this.queryWorkflowNextHandler.bind(this)
656680
);
657-
execute({
658-
queryName: queryType,
659-
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
660-
queryId,
661-
headers: headers ?? {},
662-
}).then(
681+
execute(queryInput).then(
663682
(result) => this.completeQuery(queryId, result),
664683
(reason) => this.failQuery(queryId, reason)
665684
);
@@ -797,6 +816,8 @@ export class Activator implements ActivationHandler {
797816
if (fn) {
798817
return await fn(...args);
799818
} else if (this.defaultSignalHandler) {
819+
// Do not call default signal handler with reserved signal name.
820+
throwIfReservedName('signal', signalName);
800821
return await this.defaultSignalHandler(signalName, ...args);
801822
} else {
802823
throw new IllegalStateError(`No registered signal handler for signal: ${signalName}`);

packages/workflow/src/workflow.ts

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten
2525
import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time';
2626
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
2727
import { temporal } from '@temporalio/proto';
28+
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
2829
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
2930
import { UpdateScope } from './update-scope';
3031
import {
@@ -1258,6 +1259,8 @@ export function setHandler<
12581259
options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions<Args>
12591260
): void {
12601261
const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.');
1262+
// Cannot register handler for reserved names
1263+
throwIfReservedName(def.type, def.name);
12611264
const description = options?.description;
12621265
if (def.type === 'update') {
12631266
if (typeof handler === 'function') {

0 commit comments

Comments
 (0)