Skip to content

Commit

Permalink
Merge pull request #337 from midday-ai/feature/jobs
Browse files Browse the repository at this point in the history
Feature/jobs
  • Loading branch information
pontusab authored Dec 3, 2024
2 parents c59d2a8 + 6353964 commit 7c62dff
Show file tree
Hide file tree
Showing 20 changed files with 320 additions and 211 deletions.
2 changes: 1 addition & 1 deletion apps/dashboard/jobs/tasks/bank/setup/initial.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { schedules, schemaTask } from "@trigger.dev/sdk/v3";
import { generateCronTag } from "jobs/utils/generate-cron-tag";
import { z } from "zod";
import { bankSyncScheduler } from "../scheduler/bank-sync";
import { bankSyncScheduler } from "../scheduler/bank-scheduler";
import { syncConnection } from "../sync/connection";

// This task sets up the bank sync for a new team on a daily schedule and
Expand Down
42 changes: 42 additions & 0 deletions apps/dashboard/jobs/tasks/rates/rates-scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { client } from "@midday/engine/client";
import { createClient } from "@midday/supabase/job";
import { logger, schedules } from "@trigger.dev/sdk/v3";
import { processBatch } from "jobs/utils/process-batch";

export const ratesScheduler = schedules.task({
id: "rates-scheduler",
cron: "0 0,12 * * *",
run: async () => {
// Only run in production (Set in Trigger.dev)
if (process.env.TRIGGER_ENVIRONMENT !== "production") return;

const supabase = createClient();

const ratesResponse = await client.rates.$get();

if (!ratesResponse.ok) {
logger.error("Failed to get rates");
throw new Error("Failed to get rates");
}

const { data: ratesData } = await ratesResponse.json();

const data = ratesData.flatMap((rate) => {
return Object.entries(rate.rates).map(([target, value]) => ({
base: rate.source,
target: target,
rate: value,
updated_at: rate.date,
}));
});

await processBatch(data, 500, async (batch) => {
await supabase.from("exchange_rates").upsert(batch, {
onConflict: "base, target",
ignoreDuplicates: false,
});

return batch;
});
},
});
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { createClient } from "@midday/supabase/job";
import { logger, schemaTask } from "@trigger.dev/sdk/v3";
import {
getAccountBalance,
getTransactionAmount,
} from "jobs/utils/base-currency";
import { processBatch } from "jobs/utils/process-batch";
import { z } from "zod";

const BATCH_LIMIT = 500;

export const updateAccountBaseCurrency = schemaTask({
id: "update-account-base-currency",
schema: z.object({
accountId: z.string().uuid(),
currency: z.string(),
balance: z.number(),
baseCurrency: z.string(),
}),
maxDuration: 300,
queue: {
concurrencyLimit: 10,
},
run: async ({ accountId, currency, balance, baseCurrency }) => {
const supabase = createClient();

const { data: exchangeRate } = await supabase
.from("exchange_rates")
.select("rate")
.eq("base", currency)
.eq("target", baseCurrency)
.single();

if (!exchangeRate) {
logger.info("No exchange rate found", {
currency,
baseCurrency,
});

return;
}

// Update account base balance and base currency
// based on the new currency exchange rate
await supabase
.from("bank_accounts")
.update({
base_balance: getAccountBalance({
currency: currency,
balance,
baseCurrency,
rate: exchangeRate.rate,
}),
base_currency: baseCurrency,
})
.eq("id", accountId);

const { data: transactionsData } = await supabase.rpc(
"get_all_transactions_by_account",
{
account_id: accountId,
},
);

const formattedTransactions = transactionsData?.map(
// Exclude fts_vector from the transaction object because it's a generated column
({ fts_vector, ...transaction }) => ({
...transaction,
base_amount: getTransactionAmount({
amount: transaction.amount,
currency: transaction.currency,
baseCurrency,
rate: exchangeRate?.rate,
}),
base_currency: baseCurrency,
}),
);

await processBatch(
formattedTransactions ?? [],
BATCH_LIMIT,
async (batch) => {
await supabase.from("transactions").upsert(batch, {
onConflict: "internal_id",
ignoreDuplicates: false,
});

return batch;
},
);
},
});
45 changes: 45 additions & 0 deletions apps/dashboard/jobs/tasks/transactions/update-base-currency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { createClient } from "@midday/supabase/job";
import { schemaTask } from "@trigger.dev/sdk/v3";
import { revalidateCache } from "jobs/utils/revalidate-cache";
import { triggerSequenceAndWait } from "jobs/utils/trigger-sequence";
import { z } from "zod";
import { updateAccountBaseCurrency } from "./update-account-base-currency";

export const updateBaseCurrency = schemaTask({
id: "update-base-currency",
schema: z.object({
teamId: z.string().uuid(),
baseCurrency: z.string(),
}),
maxDuration: 300,
queue: {
concurrencyLimit: 10,
},
run: async ({ teamId, baseCurrency }) => {
const supabase = createClient();

// Get all enabled accounts
const { data: accountsData } = await supabase
.from("bank_accounts")
.select("id, currency, balance")
.eq("team_id", teamId)
.eq("enabled", true);

if (!accountsData) {
return;
}

const formattedAccounts = accountsData.map((account) => ({
accountId: account.id,
currency: account.currency,
balance: account.balance,
baseCurrency,
}));

await triggerSequenceAndWait(formattedAccounts, updateAccountBaseCurrency, {
delayMinutes: 0,
});

await revalidateCache({ tag: "bank", id: teamId });
},
});
39 changes: 39 additions & 0 deletions apps/dashboard/jobs/utils/base-currency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
type GetAccountBalanceParams = {
currency: string;
balance: number;
baseCurrency: string;
rate: number | null;
};

export function getAccountBalance({
currency,
balance,
baseCurrency,
rate,
}: GetAccountBalanceParams) {
if (currency === baseCurrency) {
return balance;
}

return +(balance * (rate ?? 1)).toFixed(2);
}

type GetTransactionAmountParams = {
amount: number;
currency: string;
baseCurrency: string;
rate: number | null;
};

export function getTransactionAmount({
amount,
currency,
baseCurrency,
rate,
}: GetTransactionAmountParams) {
if (currency === baseCurrency) {
return amount;
}

return +(amount * (rate ?? 1)).toFixed(2);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"use server";

import { LogEvents } from "@midday/events/events";
import { Events, client } from "@midday/jobs";
import { updateTeam } from "@midday/supabase/mutations";
import { updateBaseCurrency } from "jobs/tasks/transactions/update-base-currency";
import { revalidatePath, revalidateTag } from "next/cache";
import { z } from "zod";
import { authActionClient } from "../safe-action";
Expand All @@ -22,6 +22,10 @@ export const updateCurrencyAction = authActionClient
})
.action(
async ({ parsedInput: { baseCurrency }, ctx: { user, supabase } }) => {
if (!user.team_id) {
throw new Error("No team id");
}

await updateTeam(supabase, {
id: user.team_id,
base_currency: baseCurrency,
Expand All @@ -30,12 +34,9 @@ export const updateCurrencyAction = authActionClient
revalidateTag(`team_settings_${user.team_id}`);
revalidatePath("/settings/accounts");

const event = await client.sendEvent({
name: Events.UPDATE_CURRENCY,
payload: {
baseCurrency,
teamId: user.team_id,
},
const event = await updateBaseCurrency.trigger({
teamId: user.team_id,
baseCurrency,
});

return event;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const cacheTags = {
"transactions",
"bank_connections",
"bank_accounts",
"insights",
"spending",
"bank_accounts_currencies",
"bank_accounts_balances",
Expand Down
29 changes: 15 additions & 14 deletions apps/dashboard/src/components/base-currency/select-currency.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,32 @@

import { updateCurrencyAction } from "@/actions/transactions/update-currency-action";
import { SelectCurrency as SelectCurrencyBase } from "@/components/select-currency";
import { useSyncStatus } from "@/hooks/use-sync-status";
import { uniqueCurrencies } from "@midday/location/currencies";
import { Button } from "@midday/ui/button";
import { useToast } from "@midday/ui/use-toast";
import { useEventDetails } from "@trigger.dev/react";
import { useAction } from "next-safe-action/hooks";
import { useEffect, useState } from "react";

export function SelectCurrency({ defaultValue }: { defaultValue: string }) {
const { toast } = useToast();
const [eventId, setEventId] = useState<string | undefined>();
const [isSyncing, setSyncing] = useState(false);
const { data } = useEventDetails(eventId);
const [runId, setRunId] = useState<string | undefined>();
const [accessToken, setAccessToken] = useState<string | undefined>();

const status = data?.runs.at(-1)?.status;

const error = status === "FAILURE" || status === "TIMED_OUT";
const { status, setStatus } = useSyncStatus({ runId, accessToken });

const updateCurrency = useAction(updateCurrencyAction, {
onExecute: () => setSyncing(true),
onSuccess: ({ data }) => {
if (data?.id) {
setEventId(data.id);
if (data) {
setRunId(data.id);
setAccessToken(data.publicAccessToken);
}
},
onError: () => {
setEventId(undefined);
setRunId(undefined);

toast({
duration: 3500,
variant: "error",
Expand Down Expand Up @@ -61,9 +61,10 @@ export function SelectCurrency({ defaultValue }: { defaultValue: string }) {
};

useEffect(() => {
if (status === "SUCCESS") {
if (status === "COMPLETED") {
setSyncing(false);
setEventId(undefined);
setStatus(null);
setRunId(undefined);
toast({
duration: 3500,
variant: "success",
Expand All @@ -84,17 +85,17 @@ export function SelectCurrency({ defaultValue }: { defaultValue: string }) {
}, [isSyncing]);

useEffect(() => {
if (error) {
if (status === "FAILED") {
setSyncing(false);
setEventId(undefined);
setRunId(undefined);

toast({
duration: 3500,
variant: "error",
title: "Something went wrong pleaase try again.",
});
}
}, [error]);
}, [status]);

return (
<div className="w-[200px]">
Expand Down
2 changes: 1 addition & 1 deletion apps/dashboard/src/hooks/use-initial-connection-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function useInitialConnectionStatus({
accessToken: initialAccessToken,
}: UseInitialConnectionStatusProps) {
const [accessToken, setAccessToken] = useState<string | undefined>(
initialAccessToken ?? "dummy",
initialAccessToken,
);
const [runId, setRunId] = useState<string | undefined>(initialRunId);
const [status, setStatus] = useState<
Expand Down
2 changes: 1 addition & 1 deletion apps/dashboard/src/hooks/use-sync-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function useSyncStatus({
accessToken: initialAccessToken,
}: UseSyncStatusProps) {
const [accessToken, setAccessToken] = useState<string | undefined>(
initialAccessToken ?? "dummy",
initialAccessToken,
);
const [runId, setRunId] = useState<string | undefined>(initialRunId);
const [status, setStatus] = useState<
Expand Down
Loading

0 comments on commit 7c62dff

Please sign in to comment.