Skip to content

Commit

Permalink
feat: new function for queue and adhoc jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
JannikZed committed Aug 15, 2024
1 parent 7c94b2c commit 2747c72
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 7 deletions.
46 changes: 44 additions & 2 deletions pkg/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,44 @@ export class WorkflowScheduler {
*/
}

/**
* generate the queue name for bullmq. We are using the workflow name and unique job identifiers
* @param queueNameGroups
* @param workflowName
* @returns
*/
private getQueueName(
queueNameGroups: (string | number)[],
workflowName: string,
) {
return ["eci", ...queueNameGroups, workflowName].join(":");
}

/**
* looks for a scheduled job and calls promote on it
* @param queueNameGroups
* @param workflowName
*/
public async promoteJob(
queueNameGroups: (string | number)[],
workflowName: string,
) {
const queueName = this.getQueueName(queueNameGroups, workflowName);
const queue = new Queue(queueName, {
connection: this.redisConnection,
});
const job = await queue.getDelayed();
if (job.length > 0) {
await job[0].promote();
this.logger.info("Promoted job", { queueName, workflowName });
} else {
this.logger.warn("No job found to promote", {
queueName,
workflowName,
});
}
}

public async schedule(
workflow: WorkflowFactory,
config: {
Expand All @@ -55,6 +93,10 @@ export class WorkflowScheduler {
*/
offset?: number;
},
/**
* To build the final name of the queue, add unique identifiers here, like:
* [tenantId.substring(0, 5), id.substring(0, 7)]
*/
queueNameGroups: (string | number)[] = [],
): Promise<void> {
if (
Expand All @@ -68,7 +110,7 @@ export class WorkflowScheduler {
return;
}

const queueName = ["eci", ...queueNameGroups, workflow.name].join(":");
const queueName = this.getQueueName(queueNameGroups, workflow.name);
if (this.scheduledWorkflows.includes(queueName)) {
throw new Error("Workflow is already scheduled");
}
Expand Down Expand Up @@ -101,7 +143,7 @@ export class WorkflowScheduler {
jobId: queueName,
repeat,
attempts: config?.attempts ?? 1,
keepLogs: 1000,
keepLogs: 2000,
backoff: {
type: "exponential",
delay: 60_000, // 1min, 2min, 4min...
Expand Down
74 changes: 69 additions & 5 deletions services/api/pages/api/saleor/webhook/v1/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import { extendContext, setupPrisma } from "@eci/pkg/webhook-context";
import { z } from "zod";
import { handleWebhook, Webhook } from "@eci/pkg/http";
import {
RedisConnection,
WorkflowScheduler,
} from "@eci/pkg/scheduler/scheduler";
import { env } from "@eci/pkg/env";
import { SaleorCustomerSyncWf } from "@eci/services/worker/src/workflows/saleorCustomerSync";
import { SaleorOrderSyncWf } from "@eci/services/worker/src/workflows/saleorOrderSync";
import { SaleorPaymentSyncWf } from "@eci/services/worker/src/workflows/saleorPaymentSync";

const requestValidation = z.object({
headers: z.object({
"x-schemabase-id": z.string(),
"saleor-domain": z.string(),
"saleor-event": z.enum([
"payment_list_gateways",
Expand All @@ -20,10 +27,11 @@ const requestValidation = z.object({
}),
});

/**
* The product data feed returns a google standard .csv file from products and
* their attributes in your shop.#
*/
const redisConnection: RedisConnection = {
host: env.require("REDIS_HOST"),
port: parseInt(env.require("REDIS_PORT")),
password: env.require("REDIS_PASSWORD"),
};
const webhook: Webhook<z.infer<typeof requestValidation>> = async ({
backgroundContext,
req,
Expand All @@ -42,6 +50,62 @@ const webhook: Webhook<z.infer<typeof requestValidation>> = async ({

ctx.logger.info(JSON.stringify(req.headers));

const saleorApp = await ctx.prisma.saleorApp.findFirst({
where: { domain: saleorDomain },
include: { installedSaleorApps: { where: { type: "entitysync" } } },
});

if (!saleorApp) {
ctx.logger.error("Saleor App not found", { saleorDomain });
res.status(404).send({ error: "Saleor App not found" });
return;
}
if (!saleorApp.installedSaleorApps.length) {
ctx.logger.error("No installed entitysync app found", { saleorDomain });
res.status(404).send({ error: "No installed apps found" });
return;
}

if (!saleorApp.tenantId) {
ctx.logger.error("TenantId not found", { saleorDomain });
res.status(404).send({ error: "TenantId not found" });
return;
}

const workflowScheduler = new WorkflowScheduler({
logger: ctx.logger,
redisConnection: redisConnection,
});

const installedSaleorApp = saleorApp.installedSaleorApps[0];

if (saleorEvent === "order_created") {
const commonQueueName = [
saleorApp.tenantId.substring(0, 5),
installedSaleorApp.id.substring(0, 7),
];
/**
* Promote the customer, order and payment workflow. Should be improved
* to a new ad-hoc job type, that is also having the dependencies
*/
await workflowScheduler.promoteJob(
commonQueueName,
SaleorCustomerSyncWf.name,
);
// wait 5 seconds
await new Promise((resolve) => setTimeout(resolve, 5000));
await workflowScheduler.promoteJob(
commonQueueName,
SaleorOrderSyncWf.name,
);
// wait 5 seconds
await new Promise((resolve) => setTimeout(resolve, 5000));
await workflowScheduler.promoteJob(
commonQueueName,
SaleorPaymentSyncWf.name,
);
}

res.send(req);
};

Expand Down

0 comments on commit 2747c72

Please sign in to comment.