Skip to content

Commit

Permalink
Merge pull request #270 from sidetracklabs/effect-3.13-refactor
Browse files Browse the repository at this point in the history
effect module improvements
  • Loading branch information
aniravi24 authored Feb 15, 2025
2 parents 6790194 + 4b75b49 commit 8b6f57e
Show file tree
Hide file tree
Showing 12 changed files with 624 additions and 244 deletions.
7 changes: 7 additions & 0 deletions .changeset/two-foxes-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"sidetrack": patch
---

Breaking changes to Effect export

rename `makeLayer` to `layer` and `createSidetrackServiceTag` to `getSidetrackService`
9 changes: 5 additions & 4 deletions packages/sidetrack/.kanelrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ const { defaultGenerateIdentifierType } = require("kanel");
/** @type {import('kanel').Config} */
module.exports = {
connection: { connectionString: process.env.DATABASE_URL },
// customTypeMap: {
// "pg_catalog.tsvector": "string",
// "pg_catalog.bpchar": "string",
// },
enumStyle: "type",
generateIdentifierType: (c, d, config) => {
const defaultResult = defaultGenerateIdentifierType(c, d, config);
// Remove the brand from the type definition
Expand All @@ -12,8 +17,4 @@ module.exports = {
},
outputPath: "./src/models/generated",
preDeleteOutputFolder: true,
// customTypeMap: {
// "pg_catalog.tsvector": "string",
// "pg_catalog.bpchar": "string",
// },
};
2 changes: 1 addition & 1 deletion packages/sidetrack/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
},
"dependencies": {
"@sidetrack/pg-migrate": "workspace:*",
"effect": "3.12.11",
"effect": "3.13.1",
"pg": "^8.13.3"
},
"devDependencies": {
Expand Down
10 changes: 6 additions & 4 deletions packages/sidetrack/src/effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export interface SidetrackService<Queues extends SidetrackQueuesGenericType> {
*/
listJobStatuses: <K extends keyof Queues>(
options?: SidetrackListJobStatusesOptions<Queues, K>,
) => Effect.Effect<Record<SidetrackJobStatusEnum, number>>;
) => Effect.Effect<Partial<Record<SidetrackJobStatusEnum, number>>>;
/**
* Test utility to get a list of jobs
*/
Expand Down Expand Up @@ -138,17 +138,17 @@ const pollingIntervalMs = (
? Duration.millis(pollingInterval)
: Duration.millis(defaultValue);

export const createSidetrackServiceTag = <
export const getSidetrackService = <
Queues extends SidetrackQueuesGenericType,
>() =>
Context.GenericTag<SidetrackService<Queues>>(
"@sidetracklabs/sidetrack/effect/service",
);

export function makeLayer<Queues extends SidetrackQueuesGenericType>(
export function layer<Queues extends SidetrackQueuesGenericType>(
layerOptions: SidetrackOptions<Queues>,
): Layer.Layer<SidetrackService<Queues>> {
return Layer.sync(createSidetrackServiceTag<Queues>(), () => {
return Layer.sync(getSidetrackService<Queues>(), () => {
const queues = layerOptions.queues;
const databaseOptions = layerOptions.databaseOptions;
const pool =
Expand All @@ -172,6 +172,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
queueName: K,
payload: Queues[K],
) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
queues[queueName].payloadTransformer
? (queues[queueName].payloadTransformer.serialize(payload) as Queues[K])
: globalPayloadTransformer
Expand All @@ -182,6 +183,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
queueName: K,
payload: Queues[K],
) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
queues[queueName].payloadTransformer
? (queues[queueName].payloadTransformer.deserialize(
payload,
Expand Down
58 changes: 21 additions & 37 deletions packages/sidetrack/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import * as Runtime from "effect/Runtime";
import * as ManagedRuntime from "effect/ManagedRuntime";

import {
createSidetrackServiceTag,
makeLayer,
SidetrackService,
} from "./effect";
import { getSidetrackService, layer, SidetrackService } from "./effect";
import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs";
import SidetrackJobs from "./models/generated/public/SidetrackJobs";
import SidetrackJobStatusEnum from "./models/generated/public/SidetrackJobStatusEnum";
Expand All @@ -31,38 +26,27 @@ import {
*/
export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
/** @internal */
protected sidetrackService = createSidetrackServiceTag<Queues>();
/** @internal */
private sidetrackLayer: Layer.Layer<SidetrackService<Queues>>;
/** @internal */
private runtime: Runtime.Runtime<SidetrackService<Queues>>;
protected sidetrackService = getSidetrackService<Queues>();
/** @internal */
protected customRunPromise: <R extends SidetrackService<Queues>, E, A>(
self: Effect.Effect<A, E, R>,
) => Promise<A>;
protected managedRuntime: ManagedRuntime.ManagedRuntime<
SidetrackService<Queues>,
never
>;

constructor(options: SidetrackOptions<Queues>) {
this.sidetrackLayer = makeLayer(options);

this.runtime = Effect.runSync(
Effect.scoped(Layer.toRuntime(this.sidetrackLayer)),
);

this.customRunPromise = <R extends SidetrackService<Queues>, E, A>(
self: Effect.Effect<A, E, R>,
) => Runtime.runPromise(this.runtime)(self);
this.managedRuntime = ManagedRuntime.make(layer(options));
}

async cancelJob(jobId: string, options?: SidetrackCancelJobOptions) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.cancelJob(jobId, options),
),
);
}

async deleteJob(jobId: string, options?: SidetrackDeleteJobOptions) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.deleteJob(jobId, options),
),
Expand All @@ -73,7 +57,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
jobId: string,
options?: SidetrackGetJobOptions,
): Promise<SidetrackJobs> {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.getJob(jobId, options),
),
Expand All @@ -85,7 +69,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
payload: Queues[K],
options?: SidetrackInsertJobOptions,
): Promise<SidetrackJobs> {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.insertJob(queueName, payload, options),
),
Expand All @@ -103,7 +87,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
payload: Queues[K],
options?: SidetrackCronJobOptions,
): Promise<SidetrackCronJobs> {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.scheduleCron(queueName, cronExpression, payload, options),
),
Expand All @@ -120,7 +104,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
cronExpression: string,
options?: SidetrackDeactivateCronScheduleOptions,
) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.deactivateCronSchedule(queueName, cronExpression, options),
),
Expand All @@ -137,7 +121,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
cronExpression: string,
options?: SidetrackDeleteCronScheduleOptions,
) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.deleteCronSchedule(queueName, cronExpression, options),
),
Expand All @@ -148,7 +132,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
* Automatically run migrations and start polling the DB for jobs
*/
async start() {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) => service.start()),
);
}
Expand All @@ -157,7 +141,7 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
* Turn off polling
*/
async stop() {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) => service.stop()),
);
}
Expand All @@ -176,7 +160,7 @@ export class SidetrackTest<
async listJobs<K extends keyof Queues>(
options?: SidetrackListJobsOptions<Queues, K>,
) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.testUtils.listJobs(options),
),
Expand All @@ -189,7 +173,7 @@ export class SidetrackTest<
async listJobStatuses<K extends keyof Queues>(
options?: SidetrackListJobStatusesOptions<Queues, K>,
) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.testUtils.listJobStatuses(options),
),
Expand All @@ -200,7 +184,7 @@ export class SidetrackTest<
* Test utility to run a job manually without polling
*/
async runJob(jobId: string, options?: SidetrackRunJobOptions) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.testUtils.runJob(jobId, options),
),
Expand All @@ -213,7 +197,7 @@ export class SidetrackTest<
async runJobs<K extends keyof Queues>(
options?: SidetrackRunJobsOptions<Queues, K>,
) {
return this.customRunPromise(
return this.managedRuntime.runPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.testUtils.runJobs(options),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// This file is automatically generated by Kanel. Do not modify manually.

/** Represents the enum public.sidetrack_cron_job_status_enum */
enum SidetrackCronJobStatusEnum {
active = 'active',
inactive = 'inactive',
};
type SidetrackCronJobStatusEnum =
| 'active'
| 'inactive';

export default SidetrackCronJobStatusEnum;
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
// This file is automatically generated by Kanel. Do not modify manually.

/** Represents the enum public.sidetrack_job_status_enum */
enum SidetrackJobStatusEnum {
scheduled = 'scheduled',
running = 'running',
cancelled = 'cancelled',
failed = 'failed',
retrying = 'retrying',
completed = 'completed',
};
type SidetrackJobStatusEnum =
| 'scheduled'
| 'running'
| 'cancelled'
| 'failed'
| 'retrying'
| 'completed';

export default SidetrackJobStatusEnum;
7 changes: 3 additions & 4 deletions packages/sidetrack/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ export type SidetrackQueues<Queues extends Record<string, unknown>> = {
};
};

export type SidetrackQueuesGenericType = Record<
string,
Record<string, unknown>
>;
// TODO making this type more specific causes issues with TypeScript when passing in types/interfaces for the queue types
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type SidetrackQueuesGenericType = Record<string, any>;

export interface SidetrackPayloadTransformer {
/**
Expand Down
Loading

0 comments on commit 8b6f57e

Please sign in to comment.