Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pontusab committed Nov 26, 2024
1 parent 1cb2ced commit 09df7b7
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 65 deletions.
52 changes: 29 additions & 23 deletions apps/dashboard/jobs/tasks/bank/notifications/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createClient } from "@midday/supabase/job";
import { schemaTask } from "@trigger.dev/sdk/v3";
import { logger, schemaTask } from "@trigger.dev/sdk/v3";
import {
handleTransactionEmails,
handleTransactionSlackNotifications,
Expand All @@ -9,36 +9,42 @@ import { z } from "zod";

export const transactionsNotification = schemaTask({
id: "transactions-notification",
maxDuration: 300,
schema: z.object({
teamId: z.string(),
}),
run: async ({ teamId }) => {
const supabase = createClient();

// Mark all transactions as processed and get the ones that need to be notified about
const { data: transactionsData } = await supabase
.from("transactions")
.update({ processed: true })
.eq("team_id", teamId)
.eq("processed", false)
.select("id, date, amount, name, currency, category, status")
.throwOnError();
try {
// Mark all transactions as processed and get the ones that need to be notified about
const { data: transactionsData } = await supabase
.from("transactions")
.update({ processed: true })
.eq("team_id", teamId)
.eq("processed", false)
.select("id, date, amount, name, currency, category, status")
.order("date", { ascending: false })
.throwOnError();

const { data: usersData } = await supabase
.from("users_on_team")
.select(
"id, team_id, team:teams(inbox_id, name), user:users(id, full_name, avatar_url, email, locale)",
)
.eq("team_id", teamId)
.eq("role", "owner")
.throwOnError();
const { data: usersData } = await supabase
.from("users_on_team")
.select(
"id, team_id, team:teams(inbox_id, name), user:users(id, full_name, avatar_url, email, locale)",
)
.eq("team_id", teamId)
.eq("role", "owner")
.throwOnError();

const sortedTransactions = transactionsData?.sort(
(a, b) => new Date(b.date).getTime() - new Date(a.date).getTime(),
);
if (transactionsData && transactionsData.length > 0) {
await handleTransactionNotifications(usersData, transactionsData);
await handleTransactionEmails(usersData, transactionsData);
await handleTransactionSlackNotifications(teamId, transactionsData);
}
} catch (error) {
await logger.error("Transactions notification", { error });

await handleTransactionNotifications(usersData, sortedTransactions);
await handleTransactionEmails(usersData, sortedTransactions);
await handleTransactionSlackNotifications(teamId, sortedTransactions);
throw error;
}
},
});
1 change: 1 addition & 0 deletions apps/dashboard/jobs/tasks/bank/scheduler/bank-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { syncConnection } from "../sync/connection";
// that has a status of "connected".
export const bankSyncScheduler = schedules.task({
id: "bank-sync-scheduler",
maxDuration: 600,
run: async (payload) => {
const supabase = createClient();

Expand Down
2 changes: 1 addition & 1 deletion apps/dashboard/jobs/tasks/bank/setup/initial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const initialBankSetup = schemaTask({
teamId: z.string().uuid(),
connectionId: z.string().uuid(),
}),
maxDuration: 600,
maxDuration: 300,
queue: {
concurrencyLimit: 20,
},
Expand Down
1 change: 1 addition & 0 deletions apps/dashboard/jobs/tasks/bank/sync/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const BATCH_SIZE = 500;

export const syncAccount = schemaTask({
id: "sync-account",
maxDuration: 300,
retry: {
maxAttempts: 2,
},
Expand Down
3 changes: 3 additions & 0 deletions apps/dashboard/jobs/tasks/bank/sync/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { syncAccount } from "./account";
// Fan-out pattern. We want to trigger a task for each bank account (Transactions, Balance)
export const syncConnection = schemaTask({
id: "sync-connection",
maxDuration: 300,
retry: {
maxAttempts: 2,
},
Expand Down Expand Up @@ -41,6 +42,8 @@ export const syncConnection = schemaTask({
},
});

logger.info("Connection response", { connectionResponse });

if (!connectionResponse.ok) {
logger.error("Failed to get connection status");
throw new Error("Failed to get connection status");
Expand Down
2 changes: 1 addition & 1 deletion apps/dashboard/jobs/tasks/bank/transactions/upsert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const transactionSchema = z.object({

export const upsertTransactions = schemaTask({
id: "upsert-transactions",
maxDuration: 20000,
maxDuration: 300,
queue: {
concurrencyLimit: 10,
},
Expand Down
23 changes: 13 additions & 10 deletions apps/dashboard/jobs/utils/transaction-notifications.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { sendSlackTransactionsNotification } from "@midday/app-store/slack";
import { sendSlackTransactionsNotification } from "@midday/app-store/slack-notifications";
import TransactionsEmail from "@midday/email/emails/transactions";
import { getI18n } from "@midday/email/locales";
import { getInboxEmail } from "@midday/inbox";
Expand All @@ -7,6 +7,7 @@ import {
TriggerEvents,
triggerBulk,
} from "@midday/notification";
import { createClient } from "@midday/supabase/job";
import { render } from "@react-email/components";
import { logger } from "@trigger.dev/sdk/v3";

Expand Down Expand Up @@ -41,7 +42,7 @@ interface Transaction {

export async function handleTransactionNotifications(
usersData: UserData[],
sortedTransactions: Transaction[],
transactions: Transaction[],
) {
const notificationEvents = usersData.map(({ user, team_id }) => {
const { t } = getI18n({ locale: user.locale ?? "en" });
Expand All @@ -50,10 +51,10 @@ export async function handleTransactionNotifications(
name: TriggerEvents.TransactionsNewInApp,
payload: {
type: NotificationTypes.Transactions,
from: sortedTransactions[sortedTransactions.length - 1]?.date,
to: sortedTransactions[0]?.date,
from: transactions[transactions.length - 1]?.date,
to: transactions[0]?.date,
description: t("notifications.transactions", {
numberOfTransactions: sortedTransactions.length,
numberOfTransactions: transactions.length,
}),
},
user: {
Expand All @@ -79,15 +80,15 @@ export async function handleTransactionNotifications(

export async function handleTransactionEmails(
usersData: UserData[],
sortedTransactions: Transaction[],
transactions: Transaction[],
) {
const emailPromises = usersData.map(async ({ user, team_id, team }) => {
const { t } = getI18n({ locale: user.locale ?? "en" });

const html = await render(
TransactionsEmail({
fullName: user.full_name,
transactions: sortedTransactions,
transactions,
locale: user.locale ?? "en",
teamName: team.name,
}),
Expand Down Expand Up @@ -123,11 +124,12 @@ export async function handleTransactionEmails(

export async function handleTransactionSlackNotifications(
teamId: string,
sortedTransactions: Transaction[],
transactions: Transaction[],
) {
const supabase = createClient();

// TODO: Get correct locale for formatting the amount
const slackTransactions = sortedTransactions.map((transaction) => ({
date: transaction.date,
const slackTransactions = transactions.map((transaction) => ({
amount: Intl.NumberFormat("en-US", {
style: "currency",
currency: transaction.currency,
Expand All @@ -138,5 +140,6 @@ export async function handleTransactionSlackNotifications(
await sendSlackTransactionsNotification({
teamId,
transactions: slackTransactions,
supabase,
});
}
4 changes: 2 additions & 2 deletions apps/engine/src/providers/gocardless/gocardless-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export class GoCardLessApi {
#baseUrl = "https://bankaccountdata.gocardless.com";

// Cache keys
#accessTokenCacheKey = "gocardless_access_token";
#refreshTokenCacheKey = "gocardless_refresh_token";
#accessTokenCacheKey = "gocardless_access_token_v2";
#refreshTokenCacheKey = "gocardless_refresh_token_v2";
#institutionsCacheKey = "gocardless_institutions";

#kv: KVNamespace;
Expand Down
1 change: 1 addition & 0 deletions packages/app-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"exports": {
".": "./src/index.ts",
"./slack": "./src/slack/index.ts",
"./slack-notifications": "./src/slack/lib/notifications/transactions.ts",
"./db": "./src/db/index.ts"
}
}
24 changes: 2 additions & 22 deletions packages/app-store/src/slack/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,9 @@ const SLACK_OAUTH_REDIRECT_URL =
const SLACK_STATE_SECRET = process.env.NEXT_PUBLIC_SLACK_STATE_SECRET;
const SLACK_SIGNING_SECRET = process.env.SLACK_SIGNING_SECRET;

if (!SLACK_CLIENT_ID) {
throw new Error("SLACK_CLIENT_ID is not defined");
}

if (!SLACK_CLIENT_SECRET) {
throw new Error("SLACK_CLIENT_SECRET is not defined");
}

if (!SLACK_OAUTH_REDIRECT_URL) {
throw new Error("SLACK_OAUTH_REDIRECT_URL is not defined");
}

if (!SLACK_STATE_SECRET) {
throw new Error("SLACK_STATE_SECRET is not defined");
}

if (!SLACK_SIGNING_SECRET) {
throw new Error("SLACK_SIGNING_SECRET is not defined");
}

export const slackInstaller = new InstallProvider({
clientId: SLACK_CLIENT_ID,
clientSecret: SLACK_CLIENT_SECRET,
clientId: SLACK_CLIENT_ID!,
clientSecret: SLACK_CLIENT_SECRET!,
stateSecret: SLACK_STATE_SECRET,
logLevel: process.env.NODE_ENV === "development" ? LogLevel.DEBUG : undefined,
});
Expand Down
10 changes: 4 additions & 6 deletions packages/app-store/src/slack/lib/notifications/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import { createClient } from "@midday/supabase/server";
import type { SupabaseClient } from "@supabase/supabase-js";
import { z } from "zod";
import config from "../../config";
import { createSlackWebClient } from "../client";

const transactionSchema = z.object({
date: z.coerce.date(),
amount: z.string(),
name: z.string(),
});

export async function sendSlackTransactionsNotification({
teamId,
transactions,
supabase,
}: {
teamId: string;
transactions: z.infer<typeof transactionSchema>[];
supabase: SupabaseClient;
}) {
const supabase = createClient({ admin: true });

const { data } = await supabase
.from("apps")
.select("settings, config")
.eq("team_id", teamId)
.eq("app_id", config.id)
.eq("app_id", "slack")
.single();

const enabled = data?.settings?.find(
Expand Down
7 changes: 7 additions & 0 deletions packages/supabase/src/client/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,11 @@ export const createClient = () =>
createSupabaseClient<Database>(
process.env.NEXT_PUBLIC_SUPABASE_URL!,
process.env.SUPABASE_SERVICE_KEY!,
{
global: {
headers: {
"sb-lb-routing-mode": "alpha-all-services",
},
},
},
);

0 comments on commit 09df7b7

Please sign in to comment.