From e730e65f83740c2c07f83a48ed397a9329bb966c Mon Sep 17 00:00:00 2001 From: Ani Ravi <5902976+aniravi24@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:12:42 -0400 Subject: [PATCH 1/3] feat: provide custom payload transformer --- packages/sidetrack/src/effect.ts | 35 ++++++++++++++++++++++++++++---- packages/sidetrack/src/types.ts | 16 +++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/packages/sidetrack/src/effect.ts b/packages/sidetrack/src/effect.ts index db252537..1e718d68 100644 --- a/packages/sidetrack/src/effect.ts +++ b/packages/sidetrack/src/effect.ts @@ -154,6 +154,30 @@ export function makeLayer( ); })()); + const globalPayloadTransformer = layerOptions.payloadTransformer; + + const payloadSerializer = ( + queueName: K, + payload: Queues[K], + ) => + queues[queueName].payloadTransformer + ? (queues[queueName].payloadTransformer.serialize(payload) as Queues[K]) + : globalPayloadTransformer + ? (globalPayloadTransformer.serialize(payload) as Queues[K]) + : payload; + + const payloadDeserializer = ( + queueName: K, + payload: Queues[K], + ) => + queues[queueName].payloadTransformer + ? (queues[queueName].payloadTransformer.deserialize( + payload, + ) as Queues[K]) + : globalPayloadTransformer + ? (globalPayloadTransformer.deserialize(payload) as Queues[K]) + : payload; + const pollingFiber = Ref.unsafeMake>( Fiber.void, ); @@ -269,9 +293,12 @@ export function makeLayer( return new SidetrackJobRunError(e); }, try: () => - queues[job.queue].run(job.payload as Queues[string], { - job: job as SidetrackJob, - }), + queues[job.queue].run( + payloadDeserializer(job.queue, job.payload as Queues[string]), + { + job: job as SidetrackJob, + }, + ), }).pipe( Effect.flatMap(() => Effect.promise(() => @@ -357,7 +384,7 @@ export function makeLayer( RETURNING *`, [ queueName, - payload, + payloadSerializer(queueName, payload), queues[queueName].options?.maxAttempts, options?.scheduledAt, options?.uniqueKey, diff --git a/packages/sidetrack/src/types.ts b/packages/sidetrack/src/types.ts index 8ec0086f..6f05715c 100644 --- a/packages/sidetrack/src/types.ts +++ b/packages/sidetrack/src/types.ts @@ -64,6 +64,7 @@ export interface SidetrackOptions { connectionString: string; }; dbClient?: SidetrackDatabaseClient; + payloadTransformer?: SidetrackPayloadTransformer; queues: SidetrackQueues; } @@ -82,6 +83,7 @@ export type SidetrackQueues> = { options?: { maxAttempts?: number; }; + payloadTransformer?: SidetrackPayloadTransformer; run: ( payload: Queues[K], context: { job: SidetrackJob }, @@ -93,3 +95,17 @@ export type SidetrackQueuesGenericType = Record< string, Record >; + +export interface SidetrackPayloadTransformer { + /** + * Transform payload prior to running the job. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + deserialize(payload: T): any; + + /** + * Transform payload prior to storing in the database + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + serialize(payload: T): any; +} From b66abf3cf466b989f19f95bd8a30fee3ce199d6b Mon Sep 17 00:00:00 2001 From: Ani Ravi <5902976+aniravi24@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:11:05 -0400 Subject: [PATCH 2/3] feat: allow Effect.Duration to be passed in for polling interval --- packages/sidetrack/src/effect.ts | 8 ++++++-- packages/sidetrack/src/types.ts | 7 ++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/sidetrack/src/effect.ts b/packages/sidetrack/src/effect.ts index 4d290829..3407fcfa 100644 --- a/packages/sidetrack/src/effect.ts +++ b/packages/sidetrack/src/effect.ts @@ -178,7 +178,11 @@ export function makeLayer( ? (globalPayloadTransformer.deserialize(payload) as Queues[K]) : payload; - const pollingIntervalMs = layerOptions.pollingIntervalMs ?? 2000; + const pollingIntervalMs = Duration.isDuration(layerOptions.pollingInterval) + ? layerOptions.pollingInterval + : layerOptions.pollingInterval + ? Duration.millis(layerOptions.pollingInterval) + : Duration.millis(2000); const pollingFiber = Ref.unsafeMake>( Fiber.void, @@ -233,7 +237,7 @@ export function makeLayer( ), ), // TODO customize polling and decrease polling time potentially? - Effect.repeat(Schedule.spaced(Duration.millis(pollingIntervalMs))), + Effect.repeat(Schedule.spaced(pollingIntervalMs)), Effect.catchAllCause(Effect.logError), Effect.forkDaemon, Effect.flatMap((fiber) => Ref.update(pollingFiber, () => fiber)), diff --git a/packages/sidetrack/src/types.ts b/packages/sidetrack/src/types.ts index 51388a25..a3617030 100644 --- a/packages/sidetrack/src/types.ts +++ b/packages/sidetrack/src/types.ts @@ -1,3 +1,4 @@ +import { Duration } from "effect"; import { JsonValue } from "type-fest"; import { SidetrackDatabaseClient } from "./client"; @@ -65,7 +66,11 @@ export interface SidetrackOptions { }; dbClient?: SidetrackDatabaseClient; payloadTransformer?: SidetrackPayloadTransformer; - pollingIntervalMs?: number; + /** + * Number of milliseconds to wait between polling for new jobs + * Alternatively, pass in an Effect.Duration of any duration + */ + pollingInterval?: Duration.Duration | number; queues: SidetrackQueues; } From 603dd876ad9e6e14fe6e8481b97659ff979a7dab Mon Sep 17 00:00:00 2001 From: Ani Ravi <5902976+aniravi24@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:28:31 -0400 Subject: [PATCH 3/3] test(sidetrack): payload transformer test --- packages/pg-migrate/package.json | 1 - packages/sidetrack/package.json | 1 - packages/sidetrack/src/types.ts | 7 ++-- packages/sidetrack/test/index.test.ts | 49 +++++++++++++++++++++++++++ pnpm-lock.yaml | 12 ------- 5 files changed, 52 insertions(+), 18 deletions(-) diff --git a/packages/pg-migrate/package.json b/packages/pg-migrate/package.json index c2301eb7..694c32bf 100644 --- a/packages/pg-migrate/package.json +++ b/packages/pg-migrate/package.json @@ -53,7 +53,6 @@ "@vitest/coverage-v8": "2.1.3", "dotenv-cli": "7.4.2", "pg": "8.13.0", - "type-fest": "4.26.1", "vite": "5.4.9", "vite-tsconfig-paths": "5.0.1", "vitest": "2.1.3" diff --git a/packages/sidetrack/package.json b/packages/sidetrack/package.json index a7dbe1ec..66564190 100644 --- a/packages/sidetrack/package.json +++ b/packages/sidetrack/package.json @@ -66,7 +66,6 @@ "@vitest/coverage-v8": "2.1.3", "dotenv-cli": "7.4.2", "kanel": "3.10.1", - "type-fest": "4.26.1", "vite": "5.4.9", "vite-tsconfig-paths": "5.0.1", "vitest": "2.1.3" diff --git a/packages/sidetrack/src/types.ts b/packages/sidetrack/src/types.ts index a3617030..d0868a6f 100644 --- a/packages/sidetrack/src/types.ts +++ b/packages/sidetrack/src/types.ts @@ -1,5 +1,4 @@ import { Duration } from "effect"; -import { JsonValue } from "type-fest"; import { SidetrackDatabaseClient } from "./client"; import SidetrackJobs from "./models/generated/public/SidetrackJobs"; @@ -79,12 +78,12 @@ export class SidetrackJobRunError { constructor(readonly error: unknown) {} } -export type SidetrackJob = Omit< +export type SidetrackJob = Omit< SidetrackJobs, "payload" > & { payload: Payload }; -export type SidetrackQueues> = { +export type SidetrackQueues> = { [K in keyof Queues]: { options?: { maxAttempts?: number; @@ -99,7 +98,7 @@ export type SidetrackQueues> = { export type SidetrackQueuesGenericType = Record< string, - Record + Record >; export interface SidetrackPayloadTransformer { diff --git a/packages/sidetrack/test/index.test.ts b/packages/sidetrack/test/index.test.ts index 8a13f225..8d1a2cf7 100644 --- a/packages/sidetrack/test/index.test.ts +++ b/packages/sidetrack/test/index.test.ts @@ -320,4 +320,53 @@ describe.concurrent("jobs", () => { expect(jobsAfterRun[0].payload).toEqual({ message: "Future job" }); }); }); + + it("payload transformer works", async () => { + await runInTransaction(async (client) => { + const sidetrack = new SidetrackTest<{ + test: { date: Date }; + }>({ + dbClient: usePg(client), + payloadTransformer: { + serialize: (payload) => { + if ((payload as any).date instanceof Date) { + return { + ...payload, + date: (payload as any).date.toISOString(), + }; + } + return payload; + }, + deserialize: (payload) => { + if (typeof (payload as any).date === "string") { + return { + ...payload, + date: new Date((payload as any).date), + }; + } + return payload; + }, + }, + queues: { + test: { + run: async (payload, { job }) => { + expect(payload.date).toBeInstanceOf(Date); + // The original payload is serialized, so the job payload is a string + expect((job.payload as any).date).toBeTypeOf("string"); + return payload; + }, + }, + }, + }); + + const date = new Date(); + const job = await sidetrack.insertJob("test", { date }); + + const retrievedJob = await sidetrack.getJob(job.id); + expect((retrievedJob.payload as any).date).toBeTypeOf("string"); + + await sidetrack.runJob(job.id); + expect((await sidetrack.getJob(job.id)).status).toBe("completed"); + }); + }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 02d62780..ce18ba65 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -139,9 +139,6 @@ importers: pg: specifier: 8.13.0 version: 8.13.0 - type-fest: - specifier: 4.26.1 - version: 4.26.1 vite: specifier: 5.4.9 version: 5.4.9(@types/node@22.5.2) @@ -176,9 +173,6 @@ importers: kanel: specifier: 3.10.1 version: 3.10.1 - type-fest: - specifier: 4.26.1 - version: 4.26.1 vite: specifier: 5.4.9 version: 5.4.9(@types/node@22.5.2) @@ -3248,10 +3242,6 @@ packages: resolution: {integrity: sha512-t0rzBq87m3fVcduHDUFhKmyyX+9eo6WQjZvf51Ea/M0Q7+T374Jp1aUiyUl0GKxp8M/OETVHSDvmkyPgvX+X2w==} engines: {node: '>=10'} - type-fest@4.26.1: - resolution: {integrity: sha512-yOGpmOAL7CkKe/91I5O3gPICmJNLJ1G4zFYVAsRHg7M64biSnPtRj0WNQt++bRkjYOqjWXrhnUw1utzmVErAdg==} - engines: {node: '>=16'} - typed-array-buffer@1.0.2: resolution: {integrity: sha512-gEymJYKZtKXzzBzM4jqa9w6Q1Jjm7x2d+sh19AdsD4wqnMPDYyvwpsIc2Q/835kHuo3BEQ7CjelGhfTsoBb2MQ==} engines: {node: '>= 0.4'} @@ -6673,8 +6663,6 @@ snapshots: type-fest@0.21.3: {} - type-fest@4.26.1: {} - typed-array-buffer@1.0.2: dependencies: call-bind: 1.0.7