Skip to content

Commit

Permalink
refactor(sidetrack): poll each queue separately
Browse files Browse the repository at this point in the history
  • Loading branch information
aniravi24 committed Oct 26, 2024
1 parent 8988ac9 commit 6b64967
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 89 deletions.
4 changes: 1 addition & 3 deletions packages/client-prisma/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ describe("jobs", () => {
dbClient: usePrisma(new PrismaClient()),
queues: {
test: {
options: {
maxAttempts: 2,
},
maxAttempts: 2,
run: async (_payload) => {
throw new Error("failure");
},
Expand Down
2 changes: 1 addition & 1 deletion packages/pg-migrate/src/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class Migration implements RunMigration {
static async create(
name: string,
directory: string,
_language?: CreateOptions | "js" | "sql" | "ts" ,
_language?: CreateOptions | "js" | "sql" | "ts",
_ignorePattern?: string,
_filenameFormat?: FilenameFormat,
) {
Expand Down
120 changes: 66 additions & 54 deletions packages/sidetrack/src/effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
import { fromIterableWith } from "effect/Record";
import * as Record from "effect/Record";
import * as Ref from "effect/Ref";
import * as Schedule from "effect/Schedule";
import * as Stream from "effect/Stream";
import * as Supervisor from "effect/Supervisor";
import pg from "pg";

import { SidetrackDatabaseClient, usePg } from "./client";
Expand All @@ -17,6 +17,7 @@ import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs";
import SidetrackJobs from "./models/generated/public/SidetrackJobs";
import SidetrackJobStatusEnum from "./models/generated/public/SidetrackJobStatusEnum";
import {
PollingInterval,
SidetrackCancelJobOptions,
SidetrackCronJobOptions,
SidetrackDeactivateCronScheduleOptions,
Expand Down Expand Up @@ -127,6 +128,16 @@ export interface SidetrackService<Queues extends SidetrackQueuesGenericType> {
};
}

const pollingIntervalMs = (
pollingInterval?: PollingInterval,
defaultValue = 2000,
) =>
Duration.isDuration(pollingInterval)
? pollingInterval
: pollingInterval
? Duration.millis(pollingInterval)
: Duration.millis(defaultValue);

export const createSidetrackServiceTag = <
Queues extends SidetrackQueuesGenericType,
>() =>
Expand All @@ -144,6 +155,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
!layerOptions.dbClient && databaseOptions
? new pg.Pool(databaseOptions)
: undefined;

const dbClient: SidetrackDatabaseClient =
layerOptions.dbClient ??
(pool
Expand Down Expand Up @@ -178,32 +190,30 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
? (globalPayloadTransformer.deserialize(payload) as Queues[K])
: payload;

const pollingIntervalMs = Duration.isDuration(layerOptions.pollingInterval)
? layerOptions.pollingInterval
: layerOptions.pollingInterval
? Duration.millis(layerOptions.pollingInterval)
: Duration.millis(2000);
const pollingInterval = pollingIntervalMs(layerOptions.pollingInterval);

const pollingFiber = Ref.unsafeMake<Fiber.Fiber<unknown, unknown>>(
Fiber.void,
);
const pollingSupervisor = Supervisor.unsafeTrack();

// TODO should we use a hashmap or supervisor? We'll need to convert this layer into an Effect
const cronFibers = Ref.unsafeMake(
Record.empty<string, Fiber.Fiber<unknown, unknown>>(),
);
const cronSupervisor = Supervisor.unsafeTrack();

/**
* Each queue is polled separately for new jobs, and the polling interval can be configured per queue
*/
const startPolling = () =>
Effect.promise(() =>
dbClient.execute<SidetrackJobs>(
`WITH next_jobs AS (
Effect.forEach(
Record.toEntries(queues),
([queueName, queue]) =>
Effect.promise(() =>
dbClient.execute<SidetrackJobs>(
`WITH next_jobs AS (
SELECT
id
FROM
sidetrack_jobs
WHERE
(status = 'scheduled' or status = 'retrying')
AND scheduled_at <= NOW()
AND queue = $1
ORDER BY
scheduled_at
FOR UPDATE SKIP LOCKED
Expand All @@ -220,27 +230,35 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
FROM
next_jobs
) RETURNING *`,
),
).pipe(
Effect.map((result) => result.rows),
Effect.flatMap((result) =>
Effect.forEach(
result,
(job) =>
executeJobRunner(job).pipe(
Effect.catchAllCause(Effect.logError),
Effect.forkDaemon,
[queueName],
),
).pipe(
Effect.map((result) => result.rows),
Effect.flatMap((result) =>
Effect.forEach(
result,
(job) =>
executeJobRunner(job).pipe(
Effect.catchAllCause(Effect.logError),
Effect.forkDaemon,
),
{
concurrency: "inherit",
},
),
{
concurrency: "inherit",
},
),
Effect.repeat(
Schedule.spaced(
queue.pollingInterval
? pollingIntervalMs(queue.pollingInterval)
: pollingInterval,
),
),
Effect.catchAllCause(Effect.logError),
Effect.supervised(pollingSupervisor),
Effect.fork,
),
),
// TODO customize polling and decrease polling time potentially?
Effect.repeat(Schedule.spaced(pollingIntervalMs)),
Effect.catchAllCause(Effect.logError),
Effect.forkDaemon,
Effect.flatMap((fiber) => Ref.update(pollingFiber, () => fiber)),
{ concurrency: "inherit" },
);

const start = () =>
Expand Down Expand Up @@ -276,19 +294,17 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
).pipe(Effect.asVoid);

const stop = () =>
Effect.all([
Ref.get(pollingFiber).pipe(
Effect.flatMap((fiber) => Fiber.interrupt(fiber)),
),
Ref.get(cronFibers).pipe(
Effect.flatMap((fibers) =>
Effect.all(
Record.values(fibers).map((fiber) => Fiber.interrupt(fiber)),
{ concurrency: "inherit" },
),
Effect.all(
[
Effect.flatMap(pollingSupervisor.value, (fibers) =>
Fiber.interruptAll(fibers),
),
),
]).pipe(Effect.asVoid);
Effect.flatMap(cronSupervisor.value, (fibers) =>
Fiber.interruptAll(fibers),
),
],
{ concurrency: "inherit" },
).pipe(Effect.asVoid);

const executeJobRunner = (
job: SidetrackJobs,
Expand Down Expand Up @@ -391,7 +407,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
[
queueName,
payloadSerializer(queueName, payload),
queues[queueName].options?.maxAttempts,
queues[queueName]?.maxAttempts,
options?.scheduledAt,
options?.uniqueKey,
],
Expand Down Expand Up @@ -464,12 +480,8 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
),
Stream.catchAllCause(Effect.logError),
Stream.runDrain,
Effect.forkDaemon,
Effect.flatMap((fiber) =>
Ref.update(cronFibers, (fibers) =>
Record.set(fibers, cronJob.id, fiber),
),
),
Effect.supervised(cronSupervisor),
Effect.fork,
);
};

Expand Down
2 changes: 1 addition & 1 deletion packages/sidetrack/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class SidetrackTest<
* Test utility to get a list of jobs
*/
async listJobs<K extends keyof Queues>(
options?: SidetrackListJobsOptions<Queues, K> ,
options?: SidetrackListJobsOptions<Queues, K>,
) {
return this.customRunPromise(
Effect.flatMap(this.sidetrackService, (service) =>
Expand Down
20 changes: 12 additions & 8 deletions packages/sidetrack/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export interface SidetrackRunJobsOptions<
queue?: K | K[] | undefined;
}

export type PollingInterval = Duration.Duration | number;

export interface SidetrackOptions<Queues extends SidetrackQueuesGenericType> {
databaseOptions?: {
connectionString: string;
Expand All @@ -69,7 +71,7 @@ export interface SidetrackOptions<Queues extends SidetrackQueuesGenericType> {
* Number of milliseconds to wait between polling for new jobs
* Alternatively, pass in an Effect.Duration of any duration
*/
pollingInterval?: Duration.Duration | number;
pollingInterval?: PollingInterval;
queues: SidetrackQueues<Queues>;
}

Expand All @@ -78,17 +80,19 @@ export class SidetrackJobRunError {
constructor(readonly error: unknown) {}
}

export type SidetrackJob<Payload extends unknown> = Omit<
SidetrackJobs,
"payload"
> & { payload: Payload };
export type SidetrackJob<Payload> = Omit<SidetrackJobs, "payload"> & {
payload: Payload;
};

export type SidetrackQueues<Queues extends Record<string, unknown>> = {
[K in keyof Queues]: {
options?: {
maxAttempts?: number;
};
maxAttempts?: number;
payloadTransformer?: SidetrackPayloadTransformer;
/**
* Number of milliseconds to wait between polling for new jobs
* Alternatively, pass in an Effect.Duration of any duration
*/
pollingInterval?: PollingInterval;
run: (
payload: Queues[K],
context: { job: SidetrackJob<Queues[K]> },
Expand Down
Loading

0 comments on commit 6b64967

Please sign in to comment.