diff --git a/.changeset/sweet-suits-kick.md b/.changeset/sweet-suits-kick.md new file mode 100644 index 0000000000..e30f302f2e --- /dev/null +++ b/.changeset/sweet-suits-kick.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/core": patch +--- + +Add option to trigger batched items sequentially, and default to parallel triggering which is faster diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 93dc6d1b1f..5b1b89d2d9 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -9,15 +9,21 @@ import { env } from "~/env.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; -import { BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server"; +import { + BatchProcessingStrategy, + BatchTriggerV2Service, +} from "~/v3/services/batchTriggerV2.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { z } from "zod"; const { action, loader } = createActionApiRoute( { - headers: HeadersSchema, + headers: HeadersSchema.extend({ + "batch-processing-strategy": BatchProcessingStrategy.nullish(), + }), body: BatchTriggerTaskV2RequestBody, allowJWT: true, maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE, @@ -52,6 +58,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-span-parent-as-link": spanParentAsLink, "x-trigger-worker": isFromWorker, "x-trigger-client": triggerClient, + "batch-processing-strategy": batchProcessingStrategy, traceparent, tracestate, } = headers; @@ -67,6 +74,7 @@ const { action, loader } = createActionApiRoute( triggerClient, traceparent, tracestate, + batchProcessingStrategy, }); const traceContext = @@ -79,7 +87,7 @@ const { action, loader } = createActionApiRoute( resolveIdempotencyKeyTTL(idempotencyKeyTTL) ?? new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); - const service = new BatchTriggerV2Service(); + const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined); try { const batch = await service.call(authentication.environment, body, { diff --git a/apps/webapp/app/routes/api.v3.runs.$runId.ts b/apps/webapp/app/routes/api.v3.runs.$runId.ts index 5d0a0e5d16..f144f8effd 100644 --- a/apps/webapp/app/routes/api.v3.runs.$runId.ts +++ b/apps/webapp/app/routes/api.v3.runs.$runId.ts @@ -15,6 +15,7 @@ export const loader = createLoaderApiRoute( findResource: (params, auth) => { return ApiRetrieveRunPresenter.findRun(params.runId, auth.environment); }, + shouldRetryNotFound: true, authorization: { action: "read", resource: (run) => ({ diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index 82c2d48a7f..dc35a0cd24 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -33,6 +33,7 @@ type ApiKeyRouteBuilderOptions< params: TParamsSchema extends z.AnyZodObject ? z.infer : undefined, authentication: ApiAuthenticationResultSuccess ) => Promise; + shouldRetryNotFound?: boolean; authorization?: { action: AuthorizationAction; resource: ( @@ -81,6 +82,7 @@ export function createLoaderApiRoute< corsStrategy = "none", authorization, findResource, + shouldRetryNotFound, } = options; if (corsStrategy !== "none" && request.method.toUpperCase() === "OPTIONS") { @@ -162,7 +164,10 @@ export function createLoaderApiRoute< if (!resource) { return await wrapResponse( request, - json({ error: "Not found" }, { status: 404 }), + json( + { error: "Not found" }, + { status: 404, headers: { "x-should-retry": shouldRetryNotFound ? "true" : "false" } } + ), corsStrategy !== "none" ); } diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index a7b43f4215..c2409cf8c5 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -733,7 +733,7 @@ function getWorkerQueue() { priority: 0, maxAttempts: 5, handler: async (payload, job) => { - const service = new BatchTriggerV2Service(); + const service = new BatchTriggerV2Service(payload.strategy); await service.processBatchTaskRun(payload); }, diff --git a/apps/webapp/app/v3/services/batchTriggerV2.server.ts b/apps/webapp/app/v3/services/batchTriggerV2.server.ts index 733261f41a..02a0cb768d 100644 --- a/apps/webapp/app/v3/services/batchTriggerV2.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV2.server.ts @@ -6,7 +6,7 @@ import { parsePacket, } from "@trigger.dev/core/v3"; import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database"; -import { $transaction, PrismaClientOrTransaction } from "~/db.server"; +import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -25,12 +25,10 @@ import { z } from "zod"; const PROCESSING_BATCH_SIZE = 50; const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; +const MAX_ATTEMPTS = 10; -const BatchProcessingStrategy = z.enum(["sequential", "parallel"]); - -type BatchProcessingStrategy = z.infer; - -const CURRENT_STRATEGY: BatchProcessingStrategy = "parallel"; +export const BatchProcessingStrategy = z.enum(["sequential", "parallel"]); +export type BatchProcessingStrategy = z.infer; export const BatchProcessingOptions = z.object({ batchId: z.string(), @@ -52,6 +50,17 @@ export type BatchTriggerTaskServiceOptions = { }; export class BatchTriggerV2Service extends BaseService { + private _batchProcessingStrategy: BatchProcessingStrategy; + + constructor( + batchProcessingStrategy?: BatchProcessingStrategy, + protected readonly _prisma: PrismaClientOrTransaction = prisma + ) { + super(_prisma); + + this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel"; + } + public async call( environment: AuthenticatedEnvironment, body: BatchTriggerTaskV2RequestBody, @@ -452,14 +461,14 @@ export class BatchTriggerV2Service extends BaseService { }, }); - switch (CURRENT_STRATEGY) { + switch (this._batchProcessingStrategy) { case "sequential": { await this.#enqueueBatchTaskRun({ batchId: batch.id, processingId: batchId, range: { start: 0, count: PROCESSING_BATCH_SIZE }, attemptCount: 0, - strategy: CURRENT_STRATEGY, + strategy: this._batchProcessingStrategy, }); break; @@ -480,7 +489,7 @@ export class BatchTriggerV2Service extends BaseService { processingId: `${index}`, range, attemptCount: 0, - strategy: CURRENT_STRATEGY, + strategy: this._batchProcessingStrategy, }, tx ) @@ -539,6 +548,16 @@ export class BatchTriggerV2Service extends BaseService { const $attemptCount = options.attemptCount + 1; + // Add early return if max attempts reached + if ($attemptCount > MAX_ATTEMPTS) { + logger.error("[BatchTriggerV2][processBatchTaskRun] Max attempts reached", { + options, + attemptCount: $attemptCount, + }); + // You might want to update the batch status to failed here + return; + } + const batch = await this._prisma.batchTaskRun.findFirst({ where: { id: options.batchId }, include: { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index db535c2f54..df02a61fc1 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -61,7 +61,7 @@ services: - 6379:6379 electric: - image: electricsql/electric:0.8.1 + image: electricsql/electric:0.9.4 restart: always environment: DATABASE_URL: postgresql://postgres:postgres@database:5432/postgres?sslmode=disable diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 162ae81f8a..734a5e5625 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -55,7 +55,7 @@ export async function createElectricContainer( network.getName() )}:5432/${postgresContainer.getDatabase()}?sslmode=disable`; - const container = await new GenericContainer("electricsql/electric:0.8.1") + const container = await new GenericContainer("electricsql/electric:0.9.4") .withExposedPorts(3000) .withNetwork(network) .withEnvironment({ diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index a49db20c77..dcded16468 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -74,6 +74,7 @@ export type ClientTriggerOptions = { export type ClientBatchTriggerOptions = ClientTriggerOptions & { idempotencyKey?: string; idempotencyKeyTTL?: string; + processingStrategy?: "parallel" | "sequential"; }; export type TriggerRequestOptions = ZodFetchOptions & { @@ -239,6 +240,7 @@ export class ApiClient { headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false, { "idempotency-key": clientOptions?.idempotencyKey, "idempotency-key-ttl": clientOptions?.idempotencyKeyTTL, + "batch-processing-strategy": clientOptions?.processingStrategy, }), body: JSON.stringify(body), }, diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index e075211429..a4d2edf9ee 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -592,7 +592,8 @@ export interface Task * ``` */ batchTriggerAndWait: ( - items: Array> + items: Array>, + options?: BatchTriggerAndWaitOptions ) => Promise>; } @@ -781,6 +782,32 @@ export type TriggerAndWaitOptions = Omit { + batchTriggerAndWait: async (items, options) => { const taskMetadata = taskCatalog.getTaskManifest(params.id); return await batchTriggerAndWait_internal( @@ -191,6 +192,7 @@ export function createTask< params.id, items, undefined, + options, undefined, customQueue ); @@ -326,7 +328,7 @@ export function createSchemaTask< }); }, params.id); }, - batchTriggerAndWait: async (items) => { + batchTriggerAndWait: async (items, options) => { const taskMetadata = taskCatalog.getTaskManifest(params.id); return await batchTriggerAndWait_internal, TOutput>( @@ -336,6 +338,7 @@ export function createSchemaTask< params.id, items, parsePayload, + options, undefined, customQueue ); @@ -469,13 +472,14 @@ export function triggerAndWait( export async function batchTriggerAndWait( id: TaskIdentifier, items: Array>>, + options?: BatchTriggerAndWaitOptions, requestOptions?: ApiRequestOptions ): Promise, TaskOutput>> { return await batchTriggerAndWait_internal< TaskIdentifier, TaskPayload, TaskOutput - >("tasks.batchTriggerAndWait()", id, items, undefined, requestOptions); + >("tasks.batchTriggerAndWait()", id, items, undefined, options, requestOptions); } /** @@ -618,6 +622,7 @@ export async function batchTriggerById( spanParentAsLink: true, idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), idempotencyKeyTTL: options?.idempotencyKeyTTL, + processingStrategy: options?.triggerSequentially ? "sequential" : undefined, }, { name: "batch.trigger()", @@ -740,6 +745,7 @@ export async function batchTriggerById( */ export async function batchTriggerByIdAndWait( items: Array>>, + options?: BatchTriggerAndWaitOptions, requestOptions?: TriggerApiRequestOptions ): Promise> { const ctx = taskContext.ctx; @@ -786,7 +792,9 @@ export async function batchTriggerByIdAndWait( ), dependentAttempt: ctx.attempt.id, }, - {}, + { + processingStrategy: options?.triggerSequentially ? "sequential" : undefined, + }, requestOptions ); @@ -948,6 +956,7 @@ export async function batchTriggerTasks( spanParentAsLink: true, idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), idempotencyKeyTTL: options?.idempotencyKeyTTL, + processingStrategy: options?.triggerSequentially ? "sequential" : undefined, }, { name: "batch.triggerByTask()", @@ -1072,6 +1081,7 @@ export async function batchTriggerAndWaitTasks; }, + options?: BatchTriggerAndWaitOptions, requestOptions?: TriggerApiRequestOptions ): Promise> { const ctx = taskContext.ctx; @@ -1118,7 +1128,9 @@ export async function batchTriggerAndWaitTasks( spanParentAsLink: true, idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey), idempotencyKeyTTL: options?.idempotencyKeyTTL, + processingStrategy: options?.triggerSequentially ? "sequential" : undefined, }, { name, @@ -1377,6 +1390,7 @@ async function batchTriggerAndWait_internal>, parsePayload?: SchemaParseFn, + options?: BatchTriggerAndWaitOptions, requestOptions?: ApiRequestOptions, queue?: QueueOptions ): Promise> { @@ -1420,7 +1434,9 @@ async function batchTriggerAndWait_internal { - const response1 = await batch.trigger([ - { id: "all-v2-test-child-1", payload: { child1: "foo" } }, - { id: "all-v2-test-child-2", payload: { child2: "bar" } }, - { id: "all-v2-test-child-1", payload: { child1: "baz" } }, - ]); + run: async ({ triggerSequentially }: { triggerSequentially?: boolean }) => { + const response1 = await batch.trigger( + [ + { id: "all-v2-test-child-1", payload: { child1: "foo" } }, + { id: "all-v2-test-child-2", payload: { child2: "bar" } }, + { id: "all-v2-test-child-1", payload: { child1: "baz" } }, + ], + { + triggerSequentially, + } + ); logger.debug("Response 1", { response1 }); @@ -156,11 +161,16 @@ export const allV2TestTask = task({ const { runs: [batchRun1, batchRun2, batchRun3], - } = await batch.triggerByTask([ - { task: allV2ChildTask1, payload: { child1: "foo" } }, - { task: allV2ChildTask2, payload: { child2: "bar" } }, - { task: allV2ChildTask1, payload: { child1: "baz" } }, - ]); + } = await batch.triggerByTask( + [ + { task: allV2ChildTask1, payload: { child1: "foo" } }, + { task: allV2ChildTask2, payload: { child2: "bar" } }, + { task: allV2ChildTask1, payload: { child1: "baz" } }, + ], + { + triggerSequentially, + } + ); logger.debug("Batch runs", { batchRun1, batchRun2, batchRun3 }); @@ -179,11 +189,16 @@ export const allV2TestTask = task({ type TaskRun3Payload = Expect>; type TaskRun3Output = Expect>; - const response3 = await batch.triggerAndWait([ - { id: "all-v2-test-child-1", payload: { child1: "foo" } }, - { id: "all-v2-test-child-2", payload: { child2: "bar" } }, - { id: "all-v2-test-child-1", payload: { child1: "baz" } }, - ]); + const response3 = await batch.triggerAndWait( + [ + { id: "all-v2-test-child-1", payload: { child1: "foo" } }, + { id: "all-v2-test-child-2", payload: { child2: "bar" } }, + { id: "all-v2-test-child-1", payload: { child1: "baz" } }, + ], + { + triggerSequentially, + } + ); logger.debug("Response 3", { response3 }); @@ -225,11 +240,16 @@ export const allV2TestTask = task({ const { runs: [batch2Run1, batch2Run2, batch2Run3], - } = await batch.triggerByTaskAndWait([ - { task: allV2ChildTask1, payload: { child1: "foo" } }, - { task: allV2ChildTask2, payload: { child2: "bar" } }, - { task: allV2ChildTask1, payload: { child1: "baz" } }, - ]); + } = await batch.triggerByTaskAndWait( + [ + { task: allV2ChildTask1, payload: { child1: "foo" } }, + { task: allV2ChildTask2, payload: { child2: "bar" } }, + { task: allV2ChildTask1, payload: { child1: "baz" } }, + ], + { + triggerSequentially, + } + ); logger.debug("Batch 2 runs", { batch2Run1, batch2Run2, batch2Run3 }); @@ -276,14 +296,17 @@ export const batchV2TestTask = task({ retry: { maxAttempts: 1, }, - run: async () => { + run: async ({ triggerSequentially }: { triggerSequentially?: boolean }) => { // First lets try triggering with too many items try { await tasks.batchTrigger( "batch-v2-test-child", Array.from({ length: 501 }, (_, i) => ({ payload: { foo: `bar${i}` }, - })) + })), + { + triggerSequentially, + } ); assert.fail("Batch trigger should have failed"); @@ -299,10 +322,12 @@ export const batchV2TestTask = task({ // tasks.batchTrigger // tasks.batchTriggerAndWait // myTask.batchTriggerAndWait - const response1 = await batchV2TestChild.batchTrigger([ - { payload: { foo: "bar" } }, - { payload: { foo: "baz" } }, - ]); + const response1 = await batchV2TestChild.batchTrigger( + [{ payload: { foo: "bar" } }, { payload: { foo: "baz" } }], + { + triggerSequentially, + } + ); logger.info("Response 1", { response1 }); @@ -360,7 +385,10 @@ export const batchV2TestTask = task({ const response2 = await batchV2TestChild.batchTrigger( Array.from({ length: 30 }, (_, i) => ({ payload: { foo: `bar${i}` }, - })) + })), + { + triggerSequentially, + } ); logger.info("Response 2", { response2 }); @@ -385,6 +413,7 @@ export const batchV2TestTask = task({ { idempotencyKey: idempotencyKey1, idempotencyKeyTTL: "5s", + triggerSequentially, } ); @@ -401,6 +430,7 @@ export const batchV2TestTask = task({ { idempotencyKey: idempotencyKey1, idempotencyKeyTTL: "5s", + triggerSequentially, } ); @@ -429,6 +459,7 @@ export const batchV2TestTask = task({ { idempotencyKey: idempotencyKey1, idempotencyKeyTTL: "5s", + triggerSequentially, } ); @@ -445,16 +476,21 @@ export const batchV2TestTask = task({ const idempotencyKeyChild1 = randomUUID(); const idempotencyKeyChild2 = randomUUID(); - const response6 = await batchV2TestChild.batchTrigger([ - { - payload: { foo: "bar" }, - options: { idempotencyKey: idempotencyKeyChild1, idempotencyKeyTTL: "5s" }, - }, + const response6 = await batchV2TestChild.batchTrigger( + [ + { + payload: { foo: "bar" }, + options: { idempotencyKey: idempotencyKeyChild1, idempotencyKeyTTL: "5s" }, + }, + { + payload: { foo: "baz" }, + options: { idempotencyKey: idempotencyKeyChild2, idempotencyKeyTTL: "15s" }, + }, + ], { - payload: { foo: "baz" }, - options: { idempotencyKey: idempotencyKeyChild2, idempotencyKeyTTL: "15s" }, - }, - ]); + triggerSequentially, + } + ); logger.info("Response 6", { response6 }); @@ -466,10 +502,15 @@ export const batchV2TestTask = task({ await setTimeout(1000); - const response7 = await batchV2TestChild.batchTrigger([ - { payload: { foo: "bar" }, options: { idempotencyKey: idempotencyKeyChild1 } }, - { payload: { foo: "baz" }, options: { idempotencyKey: idempotencyKeyChild2 } }, - ]); + const response7 = await batchV2TestChild.batchTrigger( + [ + { payload: { foo: "bar" }, options: { idempotencyKey: idempotencyKeyChild1 } }, + { payload: { foo: "baz" }, options: { idempotencyKey: idempotencyKeyChild2 } }, + ], + { + triggerSequentially, + } + ); logger.info("Response 7", { response7 }); @@ -490,10 +531,15 @@ export const batchV2TestTask = task({ await wait.for({ seconds: 6 }); // Now we need to test that the first run is not cached and is a new run, and the second run is cached - const response8 = await batchV2TestChild.batchTrigger([ - { payload: { foo: "bar" }, options: { idempotencyKey: idempotencyKeyChild1 } }, - { payload: { foo: "baz" }, options: { idempotencyKey: idempotencyKeyChild2 } }, - ]); + const response8 = await batchV2TestChild.batchTrigger( + [ + { payload: { foo: "bar" }, options: { idempotencyKey: idempotencyKeyChild1 } }, + { payload: { foo: "baz" }, options: { idempotencyKey: idempotencyKeyChild2 } }, + ], + { + triggerSequentially, + } + ); logger.info("Response 8", { response8 }); @@ -512,10 +558,12 @@ export const batchV2TestTask = task({ ); // Now we need to test with batchTriggerAndWait - const response9 = await batchV2TestChild.batchTriggerAndWait([ - { payload: { foo: "bar" } }, - { payload: { foo: "baz" } }, - ]); + const response9 = await batchV2TestChild.batchTriggerAndWait( + [{ payload: { foo: "bar" } }, { payload: { foo: "baz" } }], + { + triggerSequentially, + } + ); logger.debug("Response 9", { response9 }); @@ -548,7 +596,10 @@ export const batchV2TestTask = task({ const response10 = await batchV2TestChild.batchTriggerAndWait( Array.from({ length: 21 }, (_, i) => ({ payload: { foo: `bar${i}` }, - })) + })), + { + triggerSequentially, + } ); logger.debug("Response 10", { response10 }); @@ -557,10 +608,13 @@ export const batchV2TestTask = task({ assert.equal(response10.runs.length, 21, "response10: Items length is invalid"); // Now repeat the first few tests using `tasks.batchTrigger`: - const response11 = await tasks.batchTrigger("batch-v2-test-child", [ - { payload: { foo: "bar" } }, - { payload: { foo: "baz" } }, - ]); + const response11 = await tasks.batchTrigger( + "batch-v2-test-child", + [{ payload: { foo: "bar" } }, { payload: { foo: "baz" } }], + { + triggerSequentially, + } + ); logger.debug("Response 11", { response11 }); @@ -584,7 +638,10 @@ export const batchV2TestTask = task({ "batch-v2-test-child", Array.from({ length: 100 }, (_, i) => ({ payload: { foo: `bar${i}` }, - })) + })), + { + triggerSequentially, + } ); const response12Start = performance.now();