Skip to content

Commit

Permalink
Optionally trigger batched items sequentially to preserve order (#1536)
Browse files Browse the repository at this point in the history
* Optionally trigger batched items sequentially to preserve order

* Fix infinite v3.processBatchTaskRun enqueuings by checking the attemptCount
  • Loading branch information
ericallam authored Dec 5, 2024
1 parent 91afa5e commit 2a07ea4
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 78 deletions.
6 changes: 6 additions & 0 deletions .changeset/sweet-suits-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Add option to trigger batched items sequentially, and default to parallel triggering which is faster
14 changes: 11 additions & 3 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ import { env } from "~/env.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { z } from "zod";

const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema,
headers: HeadersSchema.extend({
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
}),
body: BatchTriggerTaskV2RequestBody,
allowJWT: true,
maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE,
Expand Down Expand Up @@ -52,6 +58,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-span-parent-as-link": spanParentAsLink,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"batch-processing-strategy": batchProcessingStrategy,
traceparent,
tracestate,
} = headers;
Expand All @@ -67,6 +74,7 @@ const { action, loader } = createActionApiRoute(
triggerClient,
traceparent,
tracestate,
batchProcessingStrategy,
});

const traceContext =
Expand All @@ -79,7 +87,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/api.v3.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const loader = createLoaderApiRoute(
findResource: (params, auth) => {
return ApiRetrieveRunPresenter.findRun(params.runId, auth.environment);
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => ({
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ApiKeyRouteBuilderOptions<
params: TParamsSchema extends z.AnyZodObject ? z.infer<TParamsSchema> : undefined,
authentication: ApiAuthenticationResultSuccess
) => Promise<TResource | undefined>;
shouldRetryNotFound?: boolean;
authorization?: {
action: AuthorizationAction;
resource: (
Expand Down Expand Up @@ -81,6 +82,7 @@ export function createLoaderApiRoute<
corsStrategy = "none",
authorization,
findResource,
shouldRetryNotFound,
} = options;

if (corsStrategy !== "none" && request.method.toUpperCase() === "OPTIONS") {
Expand Down Expand Up @@ -162,7 +164,10 @@ export function createLoaderApiRoute<
if (!resource) {
return await wrapResponse(
request,
json({ error: "Not found" }, { status: 404 }),
json(
{ error: "Not found" },
{ status: 404, headers: { "x-should-retry": shouldRetryNotFound ? "true" : "false" } }
),
corsStrategy !== "none"
);
}
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ function getWorkerQueue() {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
Expand Down
37 changes: 28 additions & 9 deletions apps/webapp/app/v3/services/batchTriggerV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
parsePacket,
} from "@trigger.dev/core/v3";
import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand All @@ -25,12 +25,10 @@ import { z } from "zod";

const PROCESSING_BATCH_SIZE = 50;
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;
const MAX_ATTEMPTS = 10;

const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);

type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

const CURRENT_STRATEGY: BatchProcessingStrategy = "parallel";
export const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);
export type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

export const BatchProcessingOptions = z.object({
batchId: z.string(),
Expand All @@ -52,6 +50,17 @@ export type BatchTriggerTaskServiceOptions = {
};

export class BatchTriggerV2Service extends BaseService {
private _batchProcessingStrategy: BatchProcessingStrategy;

constructor(
batchProcessingStrategy?: BatchProcessingStrategy,
protected readonly _prisma: PrismaClientOrTransaction = prisma
) {
super(_prisma);

this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
}

public async call(
environment: AuthenticatedEnvironment,
body: BatchTriggerTaskV2RequestBody,
Expand Down Expand Up @@ -452,14 +461,14 @@ export class BatchTriggerV2Service extends BaseService {
},
});

switch (CURRENT_STRATEGY) {
switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
});

break;
Expand All @@ -480,7 +489,7 @@ export class BatchTriggerV2Service extends BaseService {
processingId: `${index}`,
range,
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
},
tx
)
Expand Down Expand Up @@ -539,6 +548,16 @@ export class BatchTriggerV2Service extends BaseService {

const $attemptCount = options.attemptCount + 1;

// Add early return if max attempts reached
if ($attemptCount > MAX_ATTEMPTS) {
logger.error("[BatchTriggerV2][processBatchTaskRun] Max attempts reached", {
options,
attemptCount: $attemptCount,
});
// You might want to update the batch status to failed here
return;
}

const batch = await this._prisma.batchTaskRun.findFirst({
where: { id: options.batchId },
include: {
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ services:
- 6379:6379

electric:
image: electricsql/electric:0.8.1
image: electricsql/electric:0.9.4
restart: always
environment:
DATABASE_URL: postgresql://postgres:postgres@database:5432/postgres?sslmode=disable
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/testcontainers/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export async function createElectricContainer(
network.getName()
)}:5432/${postgresContainer.getDatabase()}?sslmode=disable`;

const container = await new GenericContainer("electricsql/electric:0.8.1")
const container = await new GenericContainer("electricsql/electric:0.9.4")
.withExposedPorts(3000)
.withNetwork(network)
.withEnvironment({
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type ClientTriggerOptions = {
export type ClientBatchTriggerOptions = ClientTriggerOptions & {
idempotencyKey?: string;
idempotencyKeyTTL?: string;
processingStrategy?: "parallel" | "sequential";
};

export type TriggerRequestOptions = ZodFetchOptions & {
Expand Down Expand Up @@ -239,6 +240,7 @@ export class ApiClient {
headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false, {
"idempotency-key": clientOptions?.idempotencyKey,
"idempotency-key-ttl": clientOptions?.idempotencyKeyTTL,
"batch-processing-strategy": clientOptions?.processingStrategy,
}),
body: JSON.stringify(body),
},
Expand Down
29 changes: 28 additions & 1 deletion packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
* ```
*/
batchTriggerAndWait: (
items: Array<BatchTriggerAndWaitItem<TInput>>
items: Array<BatchTriggerAndWaitItem<TInput>>,
options?: BatchTriggerAndWaitOptions
) => Promise<BatchResult<TIdentifier, TOutput>>;
}

Expand Down Expand Up @@ -781,6 +782,32 @@ export type TriggerAndWaitOptions = Omit<TriggerOptions, "idempotencyKey" | "ide
export type BatchTriggerOptions = {
idempotencyKey?: IdempotencyKey | string | string[];
idempotencyKeyTTL?: string;

/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type BatchTriggerAndWaitOptions = {
/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type TaskMetadataWithFunctions = TaskMetadata & {
Expand Down
Loading

0 comments on commit 2a07ea4

Please sign in to comment.