From 4efe759e4d91454e4ae54f560c03bc8f2598f175 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 29 Nov 2024 17:26:58 +0000 Subject: [PATCH] add backoff retries to messages with acks --- apps/coordinator/src/index.ts | 236 ++++++++++++++++++++++++++-------- 1 file changed, 179 insertions(+), 57 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index e2a7ba3259..e97835c339 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -11,7 +11,7 @@ import { } from "@trigger.dev/core/v3"; import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace"; import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket"; -import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; +import { ExponentialBackoff, HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; import { ChaosMonkey } from "./chaosMonkey"; import { Checkpointer } from "./checkpointer"; import { boolFromEnv, numFromEnv } from "./util"; @@ -37,6 +37,22 @@ const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_DISABLE_DELAYS ); +const backoff = new ExponentialBackoff("FullJitter", { + maxRetries: 7, +}); + +function serializeError(error: unknown) { + if (error instanceof Error) { + return { + name: error.name, + message: error.message, + stack: error.stack, + }; + } + + return error; +} + class CheckpointReadinessTimeoutError extends Error {} class CheckpointCancelError extends Error {} @@ -612,12 +628,36 @@ class TaskCoordinator { log.log("Handling READY_FOR_LAZY_ATTEMPT"); try { - const lazyAttempt = await this.#platformSocket?.sendWithAck("READY_FOR_LAZY_ATTEMPT", { - ...message, - envId: socket.data.envId, - }); + const lazyAttempt = await backoff + .max(2) // The run controller expects a reply in 10s so we can't have long retry delays + .maxRetries(5) + .execute(async () => { + return await this.#platformSocket?.sendWithAck("READY_FOR_LAZY_ATTEMPT", { + ...message, + envId: socket.data.envId, + }); + }); + + if (!lazyAttempt.success) { + log.error("Failed to send READY_FOR_LAZY_ATTEMPT", { + error: serializeError(lazyAttempt.error), + cause: lazyAttempt.cause, + }); + + await crashRun({ + name: "ReadyForLazyAttemptError", + message: + lazyAttempt.error instanceof Error + ? `[${lazyAttempt.cause}] ${lazyAttempt.error.name}: ${lazyAttempt.error.message}` + : `[${lazyAttempt.cause}] ${lazyAttempt.error}`, + }); + + return; + } + + const lazyAttemptResponse = lazyAttempt.result; - if (!lazyAttempt) { + if (!lazyAttemptResponse) { log.error("no lazy attempt ack"); await crashRun({ @@ -628,8 +668,10 @@ class TaskCoordinator { return; } - if (!lazyAttempt.success) { - log.error("failed to get lazy attempt payload", { reason: lazyAttempt.reason }); + if (!lazyAttemptResponse.success) { + log.error("failed to get lazy attempt payload", { + reason: lazyAttemptResponse.reason, + }); await crashRun({ name: "ReadyForLazyAttemptError", @@ -643,7 +685,7 @@ class TaskCoordinator { socket.emit("EXECUTE_TASK_RUN_LAZY_ATTEMPT", { version: "v1", - lazyPayload: lazyAttempt.lazyPayload, + lazyPayload: lazyAttemptResponse.lazyPayload, }); } catch (error) { if (error instanceof ChaosMonkey.Error) { @@ -949,20 +991,39 @@ class TaskCoordinator { log.addFields({ checkpoint }); - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_DURATION", - ms: message.ms, - now: message.now, - }, + const checkpointCreated = await backoff.execute(async () => { + return await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_DURATION", + ms: message.ms, + now: message.now, + }, + }); }); - if (ack?.keepRunAlive) { + if (!checkpointCreated.success) { + log.error("Failed to send CHECKPOINT_CREATED", { + error: serializeError(checkpointCreated.error), + cause: checkpointCreated.cause, + }); + + await crashRun({ + name: "WaitForDurationCheckpointError", + message: + checkpointCreated.error instanceof Error + ? `[${checkpointCreated.cause}] ${checkpointCreated.error.name}: ${checkpointCreated.error.message}` + : `[${checkpointCreated.cause}] ${checkpointCreated.error}`, + }); + + return; + } + + if (checkpointCreated.result?.keepRunAlive) { log.log("keeping run alive after duration checkpoint"); return; } @@ -1042,19 +1103,38 @@ class TaskCoordinator { socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_TASK", - friendlyId: message.friendlyId, - }, + const checkpointCreated = await backoff.execute(async () => { + return await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_TASK", + friendlyId: message.friendlyId, + }, + }); }); - if (ack?.keepRunAlive) { + if (!checkpointCreated.success) { + log.error("Failed to send CHECKPOINT_CREATED", { + error: serializeError(checkpointCreated.error), + cause: checkpointCreated.cause, + }); + + await crashRun({ + name: "WaitForTaskCheckpointError", + message: + checkpointCreated.error instanceof Error + ? `[${checkpointCreated.cause}] ${checkpointCreated.error.name}: ${checkpointCreated.error.message}` + : `[${checkpointCreated.cause}] ${checkpointCreated.error}`, + }); + + return; + } + + if (checkpointCreated.result?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; log.log("keeping run alive after task checkpoint"); return; @@ -1135,20 +1215,39 @@ class TaskCoordinator { socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; log.log("WAIT_FOR_BATCH set checkpoint"); - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_BATCH", - batchFriendlyId: message.batchFriendlyId, - runFriendlyIds: message.runFriendlyIds, - }, + const checkpointCreated = await backoff.execute(async () => { + return await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_BATCH", + batchFriendlyId: message.batchFriendlyId, + runFriendlyIds: message.runFriendlyIds, + }, + }); }); - if (ack?.keepRunAlive) { + if (!checkpointCreated.success) { + log.error("Failed to send CHECKPOINT_CREATED", { + error: serializeError(checkpointCreated.error), + cause: checkpointCreated.cause, + }); + + await crashRun({ + name: "WaitForBatchCheckpointError", + message: + checkpointCreated.error instanceof Error + ? `[${checkpointCreated.cause}] ${checkpointCreated.error.name}: ${checkpointCreated.error.message}` + : `[${checkpointCreated.cause}] ${checkpointCreated.error}`, + }); + + return; + } + + if (checkpointCreated.result?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; log.log("keeping run alive after batch checkpoint"); return; @@ -1239,26 +1338,49 @@ class TaskCoordinator { try { await chaosMonkey.call({ throwErrors: false }); - const createAttempt = await this.#platformSocket?.sendWithAck( - "CREATE_TASK_RUN_ATTEMPT", - { - runId: message.runId, - envId: socket.data.envId, - } - ); + const createAttempt = await backoff + .max(3) // The run controller expects a reply in 15s so we can't have long retry delays + .maxRetries(5) + .execute(async () => { + return await this.#platformSocket?.sendWithAck("CREATE_TASK_RUN_ATTEMPT", { + runId: message.runId, + envId: socket.data.envId, + }); + }); + + if (!createAttempt.success) { + log.error("Failed to send CREATE_TASK_RUN_ATTEMPT", { + error: serializeError(createAttempt.error), + cause: createAttempt.cause, + }); + + callback({ + success: false, + reason: + createAttempt.error instanceof Error + ? `[${createAttempt.cause}] ${createAttempt.error.name}: ${createAttempt.error.message}` + : `[${createAttempt.cause}] ${createAttempt.error}`, + }); - if (!createAttempt?.success) { - log.debug("no ack while creating attempt", { reason: createAttempt?.reason }); - callback({ success: false, reason: createAttempt?.reason }); return; } - updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id); - updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number); + const createAttemptResponse = createAttempt.result; + + if (!createAttemptResponse?.success) { + log.debug("no ack while creating attempt", { reason: createAttemptResponse?.reason }); + callback({ success: false, reason: createAttemptResponse?.reason }); + return; + } + + const { executionPayload } = createAttemptResponse; + + updateAttemptFriendlyId(executionPayload.execution.attempt.id); + updateAttemptNumber(executionPayload.execution.attempt.number); callback({ success: true, - executionPayload: createAttempt.executionPayload, + executionPayload, }); } catch (error) { log.error("CREATE_TASK_RUN_ATTEMPT error", { error });