Skip to content

Commit

Permalink
Merge pull request #218 from sidetracklabs/payload-transformer
Browse files Browse the repository at this point in the history
feat: support custom payload transformers
  • Loading branch information
aniravi24 authored Oct 23, 2024
2 parents e6517f3 + 603dd87 commit 8988ac9
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 25 deletions.
1 change: 0 additions & 1 deletion packages/pg-migrate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
"@vitest/coverage-v8": "2.1.3",
"dotenv-cli": "7.4.2",
"pg": "8.13.0",
"type-fest": "4.26.1",
"vite": "5.4.9",
"vite-tsconfig-paths": "5.0.1",
"vitest": "2.1.3"
Expand Down
1 change: 0 additions & 1 deletion packages/sidetrack/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"@vitest/coverage-v8": "2.1.3",
"dotenv-cli": "7.4.2",
"kanel": "3.10.1",
"type-fest": "4.26.1",
"vite": "5.4.9",
"vite-tsconfig-paths": "5.0.1",
"vitest": "2.1.3"
Expand Down
43 changes: 37 additions & 6 deletions packages/sidetrack/src/effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,35 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
);
})());

const pollingIntervalMs = layerOptions.pollingIntervalMs ?? 2000;
const globalPayloadTransformer = layerOptions.payloadTransformer;

const payloadSerializer = <K extends keyof Queues>(
queueName: K,
payload: Queues[K],
) =>
queues[queueName].payloadTransformer
? (queues[queueName].payloadTransformer.serialize(payload) as Queues[K])
: globalPayloadTransformer
? (globalPayloadTransformer.serialize(payload) as Queues[K])
: payload;

const payloadDeserializer = <K extends keyof Queues>(
queueName: K,
payload: Queues[K],
) =>
queues[queueName].payloadTransformer
? (queues[queueName].payloadTransformer.deserialize(
payload,
) as Queues[K])
: globalPayloadTransformer
? (globalPayloadTransformer.deserialize(payload) as Queues[K])
: payload;

const pollingIntervalMs = Duration.isDuration(layerOptions.pollingInterval)
? layerOptions.pollingInterval
: layerOptions.pollingInterval
? Duration.millis(layerOptions.pollingInterval)
: Duration.millis(2000);

const pollingFiber = Ref.unsafeMake<Fiber.Fiber<unknown, unknown>>(
Fiber.void,
Expand Down Expand Up @@ -209,7 +237,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
),
),
// TODO customize polling and decrease polling time potentially?
Effect.repeat(Schedule.spaced(Duration.millis(pollingIntervalMs))),
Effect.repeat(Schedule.spaced(pollingIntervalMs)),
Effect.catchAllCause(Effect.logError),
Effect.forkDaemon,
Effect.flatMap((fiber) => Ref.update(pollingFiber, () => fiber)),
Expand Down Expand Up @@ -271,9 +299,12 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
return new SidetrackJobRunError(e);
},
try: () =>
queues[job.queue].run(job.payload as Queues[string], {
job: job as SidetrackJob<Queues[string]>,
}),
queues[job.queue].run(
payloadDeserializer(job.queue, job.payload as Queues[string]),
{
job: job as SidetrackJob<Queues[string]>,
},
),
}).pipe(
Effect.flatMap(() =>
Effect.promise(() =>
Expand Down Expand Up @@ -359,7 +390,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
RETURNING *`,
[
queueName,
payload,
payloadSerializer(queueName, payload),
queues[queueName].options?.maxAttempts,
options?.scheduledAt,
options?.uniqueKey,
Expand Down
30 changes: 25 additions & 5 deletions packages/sidetrack/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JsonValue } from "type-fest";
import { Duration } from "effect";

import { SidetrackDatabaseClient } from "./client";
import SidetrackJobs from "./models/generated/public/SidetrackJobs";
Expand Down Expand Up @@ -64,7 +64,12 @@ export interface SidetrackOptions<Queues extends SidetrackQueuesGenericType> {
connectionString: string;
};
dbClient?: SidetrackDatabaseClient;
pollingIntervalMs?: number;
payloadTransformer?: SidetrackPayloadTransformer;
/**
* Number of milliseconds to wait between polling for new jobs
* Alternatively, pass in an Effect.Duration of any duration
*/
pollingInterval?: Duration.Duration | number;
queues: SidetrackQueues<Queues>;
}

Expand All @@ -73,16 +78,17 @@ export class SidetrackJobRunError {
constructor(readonly error: unknown) {}
}

export type SidetrackJob<Payload extends JsonValue> = Omit<
export type SidetrackJob<Payload extends unknown> = Omit<
SidetrackJobs,
"payload"
> & { payload: Payload };

export type SidetrackQueues<Queues extends Record<string, JsonValue>> = {
export type SidetrackQueues<Queues extends Record<string, unknown>> = {
[K in keyof Queues]: {
options?: {
maxAttempts?: number;
};
payloadTransformer?: SidetrackPayloadTransformer;
run: (
payload: Queues[K],
context: { job: SidetrackJob<Queues[K]> },
Expand All @@ -92,5 +98,19 @@ export type SidetrackQueues<Queues extends Record<string, JsonValue>> = {

export type SidetrackQueuesGenericType = Record<
string,
Record<string, JsonValue>
Record<string, unknown>
>;

export interface SidetrackPayloadTransformer {
/**
* Transform payload prior to running the job.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
deserialize<T>(payload: T): any;

/**
* Transform payload prior to storing in the database
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
serialize<T>(payload: T): any;
}
49 changes: 49 additions & 0 deletions packages/sidetrack/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,53 @@ describe.concurrent("jobs", () => {
expect(jobsAfterRun[0].payload).toEqual({ message: "Future job" });
});
});

it("payload transformer works", async () => {
await runInTransaction(async (client) => {
const sidetrack = new SidetrackTest<{
test: { date: Date };
}>({
dbClient: usePg(client),
payloadTransformer: {
serialize: (payload) => {
if ((payload as any).date instanceof Date) {
return {
...payload,
date: (payload as any).date.toISOString(),
};
}
return payload;
},
deserialize: (payload) => {
if (typeof (payload as any).date === "string") {
return {
...payload,
date: new Date((payload as any).date),
};
}
return payload;
},
},
queues: {
test: {
run: async (payload, { job }) => {
expect(payload.date).toBeInstanceOf(Date);
// The original payload is serialized, so the job payload is a string
expect((job.payload as any).date).toBeTypeOf("string");
return payload;
},
},
},
});

const date = new Date();
const job = await sidetrack.insertJob("test", { date });

const retrievedJob = await sidetrack.getJob(job.id);
expect((retrievedJob.payload as any).date).toBeTypeOf("string");

await sidetrack.runJob(job.id);
expect((await sidetrack.getJob(job.id)).status).toBe("completed");
});
});
});
12 changes: 0 additions & 12 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8988ac9

Please sign in to comment.