Skip to content

Commit

Permalink
Fix: trigger queue updates (#1491)
Browse files Browse the repository at this point in the history
* Fixed task trigger queue update logic to only update an existing queue if the concurrency limit changes, instead of on every single trigger task call

* Fix bad update from another branch
  • Loading branch information
ericallam authored Nov 22, 2024
1 parent e7fc592 commit 4305a23
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 41 deletions.
40 changes: 19 additions & 21 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,30 +131,28 @@ export class CompleteAttemptService extends BaseService {
taskRunAttempt: NonNullable<FoundAttempt>,
env?: AuthenticatedEnvironment
): Promise<"COMPLETED"> {
await $transaction(this._prisma, async (tx) => {
await tx.taskRunAttempt.update({
where: { id: taskRunAttempt.id },
data: {
status: "COMPLETED",
completedAt: new Date(),
output: completion.output,
outputType: completion.outputType,
usageDurationMs: completion.usage?.durationMs,
taskRun: {
update: {
output: completion.output,
outputType: completion.outputType,
},
await this._prisma.taskRunAttempt.update({
where: { id: taskRunAttempt.id },
data: {
status: "COMPLETED",
completedAt: new Date(),
output: completion.output,
outputType: completion.outputType,
usageDurationMs: completion.usage?.durationMs,
taskRun: {
update: {
output: completion.output,
outputType: completion.outputType,
},
},
});
},
});

const finalizeService = new FinalizeTaskRunService(tx);
await finalizeService.call({
id: taskRunAttempt.taskRunId,
status: "COMPLETED_SUCCESSFULLY",
completedAt: new Date(),
});
const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRunAttempt.taskRunId,
status: "COMPLETED_SUCCESSFULLY",
completedAt: new Date(),
});

// Now we need to "complete" the task run event/span
Expand Down
56 changes: 37 additions & 19 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,20 +414,40 @@ export class TriggerTaskService extends BaseService {
},
});

const existingConcurrencyLimit =
typeof taskQueue?.concurrencyLimit === "number"
? taskQueue.concurrencyLimit
: undefined;

if (taskQueue) {
taskQueue = await tx.taskQueue.update({
where: {
id: taskQueue.id,
},
data: {
concurrencyLimit,
rateLimit: body.options.queue.rateLimit,
},
});
if (existingConcurrencyLimit !== concurrencyLimit) {
taskQueue = await tx.taskQueue.update({
where: {
id: taskQueue.id,
},
data: {
concurrencyLimit:
typeof concurrencyLimit === "number" ? concurrencyLimit : null,
rateLimit: body.options.queue.rateLimit,
},
});

if (typeof taskQueue.concurrencyLimit === "number") {
await marqs?.updateQueueConcurrencyLimits(
environment,
taskQueue.name,
taskQueue.concurrencyLimit
);
} else {
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
}
}
} else {
const queueId = generateFriendlyId("queue");

taskQueue = await tx.taskQueue.create({
data: {
friendlyId: generateFriendlyId("queue"),
friendlyId: queueId,
name: queueName,
concurrencyLimit,
runtimeEnvironmentId: environment.id,
Expand All @@ -436,16 +456,14 @@ export class TriggerTaskService extends BaseService {
type: "NAMED",
},
});
}

if (typeof taskQueue.concurrencyLimit === "number") {
await marqs?.updateQueueConcurrencyLimits(
environment,
taskQueue.name,
taskQueue.concurrencyLimit
);
} else {
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
if (typeof taskQueue.concurrencyLimit === "number") {
await marqs?.updateQueueConcurrencyLimits(
environment,
taskQueue.name,
taskQueue.concurrencyLimit
);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion references/v3-catalog/src/trigger/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export const batchChildTask = task({
retry: {
maxAttempts: 2,
},
run: async (payload: string, { ctx }) => {
run: async (payload: any, { ctx }) => {
logger.info("Processing child task", { payload });

await wait.for({ seconds: 1 });
Expand Down

0 comments on commit 4305a23

Please sign in to comment.