Skip to content

Commit 15816e9

Browse files
authored
Fix long retry delays (#2020)
* make testcontainers wait until container has stopped * require unit tests for publishing again * add failing test case * make it pass * add retry threshold ms env var and use it
1 parent a10e2a8 commit 15816e9

File tree

7 files changed

+77
-7
lines changed

7 files changed

+77
-7
lines changed

.github/workflows/publish.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@ jobs:
5656
secrets: inherit
5757

5858
publish-webapp:
59-
needs: [typecheck]
59+
needs: [typecheck, units]
6060
uses: ./.github/workflows/publish-webapp.yml
6161
secrets: inherit
6262
with:
6363
image_tag: ${{ inputs.image_tag }}
6464

6565
publish-worker:
66-
needs: [typecheck]
66+
needs: [typecheck, units]
6767
uses: ./.github/workflows/publish-worker.yml
6868
secrets: inherit
6969
with:

apps/webapp/app/env.server.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ const EnvironmentSchema = z.object({
460460
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
461461
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
462462
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
463+
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
463464

464465
RUN_ENGINE_WORKER_REDIS_HOST: z
465466
.string()
@@ -717,7 +718,7 @@ const EnvironmentSchema = z.object({
717718

718719
SLACK_BOT_TOKEN: z.string().optional(),
719720
SLACK_SIGNUP_REASON_CHANNEL_ID: z.string().optional(),
720-
721+
721722
// kapa.ai
722723
KAPA_AI_WEBSITE_ID: z.string().optional(),
723724
});

apps/webapp/app/v3/runEngine.server.ts

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ function createRunEngine() {
9595
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
9696
},
9797
},
98+
retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS,
9899
});
99100

100101
return engine;

internal-packages/run-engine/src/engine/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ export class RunEngine {
304304
waitpointSystem: this.waitpointSystem,
305305
delayedRunSystem: this.delayedRunSystem,
306306
machines: this.options.machines,
307+
retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs,
307308
});
308309

309310
this.dequeueSystem = new DequeueSystem({

internal-packages/run-engine/src/run-queue/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ export class RunQueue {
541541
}
542542
}
543543

544-
await this.#callNackMessage({ message });
544+
await this.#callNackMessage({ message, retryAt });
545545

546546
return true;
547547
},

internal-packages/run-engine/src/run-queue/tests/nack.test.ts

+61
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,65 @@ describe("RunQueue.nackMessage", () => {
214214
}
215215
}
216216
);
217+
218+
redisTest(
219+
"nacking a message with retryAt sets the correct requeue time",
220+
async ({ redisContainer }) => {
221+
const queue = new RunQueue({
222+
...testOptions,
223+
queueSelectionStrategy: new FairQueueSelectionStrategy({
224+
redis: {
225+
keyPrefix: "runqueue:test:",
226+
host: redisContainer.getHost(),
227+
port: redisContainer.getPort(),
228+
},
229+
keys: testOptions.keys,
230+
}),
231+
redis: {
232+
keyPrefix: "runqueue:test:",
233+
host: redisContainer.getHost(),
234+
port: redisContainer.getPort(),
235+
},
236+
});
237+
238+
try {
239+
const envMasterQueue = `env:${authenticatedEnvDev.id}`;
240+
241+
// Enqueue message
242+
await queue.enqueueMessage({
243+
env: authenticatedEnvDev,
244+
message: messageDev,
245+
masterQueues: ["main", envMasterQueue],
246+
});
247+
248+
// Dequeue message
249+
const dequeued = await queue.dequeueMessageFromMasterQueue(
250+
"test_12345",
251+
envMasterQueue,
252+
10
253+
);
254+
expect(dequeued.length).toBe(1);
255+
256+
// Set retryAt to 5 seconds in the future
257+
const retryAt = Date.now() + 5000;
258+
await queue.nackMessage({
259+
orgId: messageDev.orgId,
260+
messageId: messageDev.runId,
261+
retryAt,
262+
});
263+
264+
// Check the score of the message in the queue
265+
const queueKey = queue.keys.queueKey(authenticatedEnvDev, messageDev.queue);
266+
const score = await queue.oldestMessageInQueue(authenticatedEnvDev, messageDev.queue);
267+
expect(typeof score).toBe("number");
268+
if (typeof score !== "number") {
269+
throw new Error("Expected score to be a number, but got undefined");
270+
}
271+
// Should be within 100ms of retryAt
272+
expect(Math.abs(score - retryAt)).toBeLessThanOrEqual(100);
273+
} finally {
274+
await queue.quit();
275+
}
276+
}
277+
);
217278
});

internal-packages/testcontainers/src/index.ts

+9-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ const postgresContainer = async (
5353
try {
5454
await use(container);
5555
} finally {
56-
await container.stop();
56+
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
57+
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
58+
await container.stop({ timeout: 10 });
5759
}
5860
};
5961

@@ -92,7 +94,9 @@ const redisContainer = async (
9294
try {
9395
await use(container);
9496
} finally {
95-
await container.stop();
97+
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
98+
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
99+
await container.stop({ timeout: 10 });
96100
}
97101
};
98102

@@ -142,7 +146,9 @@ const electricOrigin = async (
142146
try {
143147
await use(origin);
144148
} finally {
145-
await container.stop();
149+
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
150+
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
151+
await container.stop({ timeout: 10 });
146152
}
147153
};
148154

0 commit comments

Comments
 (0)