From 16e6f83539c4d1c0831d32c1e94926345e8a7495 Mon Sep 17 00:00:00 2001 From: Pontus Abrahamsson Date: Mon, 2 Dec 2024 18:10:45 +0100 Subject: [PATCH] Export transactions --- .../invoice/operations/generate-invoice.ts | 8 +- .../jobs/tasks/transactions/export.ts | 124 ++++++++++ .../jobs/tasks/transactions/process.ts | 115 +++++++++ apps/dashboard/jobs/utils/blob.ts | 8 + .../src/actions/export-transactions-action.ts | 17 +- .../src/components/export-status.tsx | 49 ++-- .../dashboard/src/components/invoice/logo.tsx | 2 +- .../src/components/invoice/template.tsx | 9 - .../tables/transactions/export-bar.tsx | 10 +- apps/dashboard/src/hooks/use-export-status.ts | 64 +++++ .../hooks/use-initial-connection-status.ts | 1 + apps/dashboard/src/store/export.ts | 14 +- packages/jobs/src/constants.ts | 2 - packages/jobs/src/transactions/export.ts | 227 ------------------ packages/jobs/src/transactions/index.ts | 1 - 15 files changed, 365 insertions(+), 286 deletions(-) create mode 100644 apps/dashboard/jobs/tasks/transactions/export.ts create mode 100644 apps/dashboard/jobs/tasks/transactions/process.ts create mode 100644 apps/dashboard/jobs/utils/blob.ts delete mode 100644 apps/dashboard/src/components/invoice/template.tsx create mode 100644 apps/dashboard/src/hooks/use-export-status.ts delete mode 100644 packages/jobs/src/transactions/export.ts diff --git a/apps/dashboard/jobs/tasks/invoice/operations/generate-invoice.ts b/apps/dashboard/jobs/tasks/invoice/operations/generate-invoice.ts index 63b44754bc..ba7591b3f7 100644 --- a/apps/dashboard/jobs/tasks/invoice/operations/generate-invoice.ts +++ b/apps/dashboard/jobs/tasks/invoice/operations/generate-invoice.ts @@ -28,13 +28,7 @@ export const generateInvoice = schemaTask({ const { user, ...invoice } = invoiceData; - const buffer = await renderToBuffer( - await PdfTemplate({ - ...invoice, - timezone: user?.timezone, - locale: user?.locale, - }), - ); + const buffer = await renderToBuffer(await PdfTemplate(invoice)); const filename = `${invoiceData?.invoice_number}.pdf`; diff --git a/apps/dashboard/jobs/tasks/transactions/export.ts b/apps/dashboard/jobs/tasks/transactions/export.ts new file mode 100644 index 0000000000..7dae309094 --- /dev/null +++ b/apps/dashboard/jobs/tasks/transactions/export.ts @@ -0,0 +1,124 @@ +import { writeToString } from "@fast-csv/format"; +import { createClient } from "@midday/supabase/job"; +import { metadata, schemaTask } from "@trigger.dev/sdk/v3"; +import { BlobReader, BlobWriter, TextReader, ZipWriter } from "@zip.js/zip.js"; +import { serializableToBlob } from "jobs/utils/blob"; +import { z } from "zod"; +import { processTransactions } from "./process"; + +// Process transactions in batches of 100 +const BATCH_SIZE = 100; + +export const exportTransactions = schemaTask({ + id: "export-transactions", + schema: z.object({ + teamId: z.string().uuid(), + locale: z.string(), + transactionIds: z.array(z.string().uuid()), + }), + maxDuration: 300, + queue: { + concurrencyLimit: 10, + }, + run: async ({ teamId, locale, transactionIds }) => { + const supabase = createClient(); + + const filePath = `export-${new Date().toISOString()}`; + const path = `${teamId}/exports`; + const fileName = `${filePath}.zip`; + + metadata.set("progress", 20); + + // Process transactions in batches of 100 and collect results + // Update progress for each batch + const results = []; + + const totalBatches = Math.ceil(transactionIds.length / BATCH_SIZE); + const progressPerBatch = 60 / totalBatches; + let currentProgress = 20; + + for (let i = 0; i < transactionIds.length; i += BATCH_SIZE) { + const transactionBatch = transactionIds.slice(i, i + BATCH_SIZE); + + const batchResult = await processTransactions.triggerAndWait({ + ids: transactionBatch, + locale, + }); + + results.push(batchResult); + + currentProgress += progressPerBatch; + metadata.set("progress", Math.round(currentProgress)); + } + + const rows = results + .flatMap((r) => (r.ok ? r.output.rows : [])) + // Date is the first column + .sort( + (a, b) => + new Date(b[0] as string).getTime() - + new Date(a[0] as string).getTime(), + ); + + const attachments = results.flatMap((r) => + r.ok ? r.output.attachments : [], + ); + + const csv = await writeToString(rows, { + headers: [ + "Date", + "Description", + "Additional info", + "Amount", + "Currency", + "Formatted amount", + "VAT", + "Category", + "Category description", + "Status", + "Attachments", + "Balance", + "Account", + "Note", + ], + }); + + const zipFileWriter = new BlobWriter("application/zip"); + const zipWriter = new ZipWriter(zipFileWriter); + + zipWriter.add("transactions.csv", new TextReader(csv)); + + metadata.set("progress", 90); + + // Add attachments to zip + attachments?.map((attachment) => { + if (attachment.blob) { + zipWriter.add( + attachment.name, + new BlobReader(serializableToBlob(attachment.blob)), + ); + } + }); + + const zip = await zipWriter.close(); + + metadata.set("progress", 95); + + await supabase.storage + .from("vault") + .upload(`${path}/${fileName}`, await zip.arrayBuffer(), { + upsert: true, + contentType: "application/zip", + }); + + // revalidateTag(`vault_${teamId}`); + + metadata.set("progress", 100); + + return { + filePath, + fileName, + totalItems: rows.length, + }; + }, +}); diff --git a/apps/dashboard/jobs/tasks/transactions/process.ts b/apps/dashboard/jobs/tasks/transactions/process.ts new file mode 100644 index 0000000000..6898eff647 --- /dev/null +++ b/apps/dashboard/jobs/tasks/transactions/process.ts @@ -0,0 +1,115 @@ +import { createClient } from "@midday/supabase/job"; +import { download } from "@midday/supabase/storage"; +import { schemaTask } from "@trigger.dev/sdk/v3"; +import { blobToSerializable } from "jobs/utils/blob"; +import { processBatch } from "jobs/utils/process-batch"; +import { z } from "zod"; + +const ATTACHMENT_BATCH_SIZE = 20; + +export const processTransactions = schemaTask({ + id: "process-transactions", + schema: z.object({ + ids: z.array(z.string().uuid()), + locale: z.string(), + }), + maxDuration: 300, + queue: { + concurrencyLimit: 5, + }, + run: async ({ ids, locale }) => { + const supabase = createClient(); + + const { data: transactionsData } = await supabase + .from("transactions") + .select(` + id, + date, + name, + description, + amount, + note, + balance, + currency, + vat:calculated_vat, + attachments:transaction_attachments(*), + category:transaction_categories(id, name, description), + bank_account:bank_accounts(id, name) + `) + .in("id", ids) + .throwOnError(); + + const attachments = await processBatch( + transactionsData ?? [], + ATTACHMENT_BATCH_SIZE, + async (batch) => { + const batchAttachments = await Promise.all( + batch.flatMap((transaction, idx) => { + const rowId = idx + 1; + return (transaction.attachments ?? []).map( + async (attachment, idx2: number) => { + const filename = attachment.name?.split(".").at(0); + const extension = attachment.name?.split(".").at(-1); + + const name = + idx2 > 0 + ? `${filename}-${rowId}_${idx2}.${extension}` + : `${filename}-${rowId}.${extension}`; + + const { data } = await download(supabase, { + bucket: "vault", + path: (attachment.path ?? []).join("/"), + }); + + return { + id: transaction.id, + name, + blob: data ? await blobToSerializable(data) : null, + }; + }, + ); + }), + ); + + return batchAttachments.flat(); + }, + ); + + const rows = transactionsData + ?.sort((a, b) => new Date(a.date).getTime() - new Date(b.date).getTime()) + .map((transaction) => [ + transaction.date, + transaction.name, + transaction.description, + transaction.amount, + transaction.currency, + Intl.NumberFormat(locale, { + style: "currency", + currency: transaction.currency, + }).format(transaction.amount), + transaction?.vat + ? Intl.NumberFormat(locale, { + style: "currency", + currency: transaction.currency, + }).format(transaction?.vat) + : "", + transaction?.category?.name ?? "", + transaction?.category?.description ?? "", + transaction?.attachments?.length > 0 ? "✔️" : "❌", + + attachments + .filter((a) => a.id === transaction.id) + .map((a) => a.name) + .join(", ") ?? "", + + transaction?.balance ?? "", + transaction?.bank_account?.name ?? "", + transaction?.note ?? "", + ]); + + return { + rows: rows ?? [], + attachments: attachments ?? [], + }; + }, +}); diff --git a/apps/dashboard/jobs/utils/blob.ts b/apps/dashboard/jobs/utils/blob.ts new file mode 100644 index 0000000000..6415becb40 --- /dev/null +++ b/apps/dashboard/jobs/utils/blob.ts @@ -0,0 +1,8 @@ +export async function blobToSerializable(blob: Blob) { + const arrayBuffer = await blob.arrayBuffer(); + return Array.from(new Uint8Array(arrayBuffer)); +} + +export function serializableToBlob(array: number[], contentType = "") { + return new Blob([new Uint8Array(array)], { type: contentType }); +} diff --git a/apps/dashboard/src/actions/export-transactions-action.ts b/apps/dashboard/src/actions/export-transactions-action.ts index 0c983df11e..b29f82de36 100644 --- a/apps/dashboard/src/actions/export-transactions-action.ts +++ b/apps/dashboard/src/actions/export-transactions-action.ts @@ -1,7 +1,7 @@ "use server"; import { LogEvents } from "@midday/events/events"; -import { Events, client } from "@midday/jobs"; +import { exportTransactions } from "jobs/tasks/transactions/export"; import { authActionClient } from "./safe-action"; import { exportTransactionsSchema } from "./schema"; @@ -15,13 +15,14 @@ export const exportTransactionsAction = authActionClient }, }) .action(async ({ parsedInput: transactionIds, ctx: { user } }) => { - const event = await client.sendEvent({ - name: Events.TRANSACTIONS_EXPORT, - payload: { - transactionIds, - teamId: user.team_id, - locale: user.locale, - }, + if (!user.team_id || !user.locale) { + throw new Error("User not found"); + } + + const event = await exportTransactions.trigger({ + teamId: user.team_id, + locale: user.locale, + transactionIds, }); return event; diff --git a/apps/dashboard/src/components/export-status.tsx b/apps/dashboard/src/components/export-status.tsx index 592c397f9a..459d4e7092 100644 --- a/apps/dashboard/src/components/export-status.tsx +++ b/apps/dashboard/src/components/export-status.tsx @@ -1,6 +1,7 @@ "use client"; import { shareFileAction } from "@/actions/share-file-action"; +import { useExportStatus } from "@/hooks/use-export-status"; import { useExportStore } from "@/store/export"; import { Button } from "@midday/ui/button"; import { @@ -11,7 +12,6 @@ import { } from "@midday/ui/dropdown-menu"; import { Icons } from "@midday/ui/icons"; import { useToast } from "@midday/ui/use-toast"; -import { useEventRunStatuses } from "@trigger.dev/react"; import ms from "ms"; import { useAction } from "next-safe-action/hooks"; import { useEffect, useState } from "react"; @@ -34,9 +34,8 @@ const options = [ export function ExportStatus() { const { toast, dismiss, update } = useToast(); const [toastId, setToastId] = useState(null); - const { exportId, setExportId } = useExportStore(); - const { error, statuses } = useEventRunStatuses(exportId); - const status = statuses?.at(0); + const { exportData, setExportData } = useExportStore(); + const { status, progress, result } = useExportStatus(exportData); const shareFile = useAction(shareFileAction, { onError: () => { @@ -67,7 +66,20 @@ export function ExportStatus() { }; useEffect(() => { - if (exportId && !toastId) { + if (status === "FAILED") { + toast({ + duration: 2500, + variant: "error", + title: "Something went wrong please try again.", + }); + + setToastId(null); + setExportData(undefined); + } + }, [status]); + + useEffect(() => { + if (exportData && !toastId) { const { id } = toast({ title: "Exporting transactions.", variant: "progress", @@ -79,14 +91,14 @@ export function ExportStatus() { setToastId(id); } else { update(toastId, { - progress: status?.data?.progress, + progress, }); } - if (status?.data?.progress === 100) { + if (status === "COMPLETED" && result) { update(toastId, { title: "Export completed", - description: `Your export is ready based on ${status?.data?.totalItems} transactions. It's stored in your Vault.`, + description: `Your export is ready based on ${result.totalItems} transactions. It's stored in your Vault.`, variant: "success", footer: (
@@ -108,7 +120,7 @@ export function ExportStatus() { onClick={() => handleOnShare({ expireIn: option.expireIn, - filename: status?.data?.fileName, + filename: result.fileName, }) } > @@ -119,7 +131,7 @@ export function ExportStatus() {
; -} diff --git a/apps/dashboard/src/components/tables/transactions/export-bar.tsx b/apps/dashboard/src/components/tables/transactions/export-bar.tsx index 3dc899840e..f319a96c21 100644 --- a/apps/dashboard/src/components/tables/transactions/export-bar.tsx +++ b/apps/dashboard/src/components/tables/transactions/export-bar.tsx @@ -15,13 +15,19 @@ type Props = { export function ExportBar({ selected, deselectAll }: Props) { const { toast } = useToast(); - const { setExportId } = useExportStore(); + const { setExportData } = useExportStore(); const { rowSelection } = useTransactionsStore(); const [isOpen, setOpen] = useState(false); const { execute, status } = useAction(exportTransactionsAction, { onSuccess: ({ data }) => { - setExportId(data?.id); + if (data?.id && data?.publicAccessToken) { + setExportData({ + runId: data.id, + accessToken: data.publicAccessToken, + }); + } + setOpen(false); }, onError: () => { diff --git a/apps/dashboard/src/hooks/use-export-status.ts b/apps/dashboard/src/hooks/use-export-status.ts new file mode 100644 index 0000000000..7e036b0672 --- /dev/null +++ b/apps/dashboard/src/hooks/use-export-status.ts @@ -0,0 +1,64 @@ +import { useRealtimeRun } from "@trigger.dev/react-hooks"; +import { useEffect, useState } from "react"; + +type UseExportStatusProps = { + runId?: string; + accessToken?: string; +}; + +export function useExportStatus({ + runId: initialRunId, + accessToken: initialAccessToken, +}: UseExportStatusProps = {}) { + const [accessToken, setAccessToken] = useState( + initialAccessToken, + ); + const [runId, setRunId] = useState(initialRunId); + const [status, setStatus] = useState< + "FAILED" | "IN_PROGRESS" | "COMPLETED" | null + >(null); + + const [_, setProgress] = useState(0); + + const [result, setResult] = useState(null); + + const { run, error } = useRealtimeRun(runId, { + enabled: !!runId && !!accessToken, + accessToken, + }); + + console.log(run); + + useEffect(() => { + if (initialRunId && initialAccessToken) { + setAccessToken(initialAccessToken); + setRunId(initialRunId); + setStatus("IN_PROGRESS"); + } + }, [initialRunId, initialAccessToken]); + + useEffect(() => { + if (error || run?.status === "FAILED") { + setStatus("FAILED"); + setProgress(0); + } + + if (run?.status === "COMPLETED") { + setStatus("COMPLETED"); + setProgress(100); + } + }, [error, run]); + + useEffect(() => { + if (run?.output) { + setResult(run.output); + } + }, [run]); + + return { + status, + setStatus, + progress: run?.metadata?.progress ?? 0, + result, + }; +} diff --git a/apps/dashboard/src/hooks/use-initial-connection-status.ts b/apps/dashboard/src/hooks/use-initial-connection-status.ts index 51d53dc375..f1ca461edc 100644 --- a/apps/dashboard/src/hooks/use-initial-connection-status.ts +++ b/apps/dashboard/src/hooks/use-initial-connection-status.ts @@ -17,6 +17,7 @@ export function useInitialConnectionStatus({ const [status, setStatus] = useState< "FAILED" | "SYNCING" | "COMPLETED" | null >(null); + const { run, error } = useRealtimeRun(runId, { enabled: !!runId && !!accessToken, accessToken, diff --git a/apps/dashboard/src/store/export.ts b/apps/dashboard/src/store/export.ts index e067aefe92..b3cc194fc7 100644 --- a/apps/dashboard/src/store/export.ts +++ b/apps/dashboard/src/store/export.ts @@ -1,11 +1,17 @@ import { create } from "zustand"; interface ExportState { - exportId?: string; - setExportId: (exportId?: string) => void; + exportData?: { + runId?: string; + accessToken?: string; + }; + setExportData: (exportData?: { + runId?: string; + accessToken?: string; + }) => void; } export const useExportStore = create()((set) => ({ - exportId: undefined, - setExportId: (exportId) => set({ exportId }), + exportData: undefined, + setExportData: (exportData) => set({ exportData }), })); diff --git a/packages/jobs/src/constants.ts b/packages/jobs/src/constants.ts index e8ce9a9faa..bcc25d1673 100644 --- a/packages/jobs/src/constants.ts +++ b/packages/jobs/src/constants.ts @@ -1,5 +1,4 @@ export const Jobs = { - TRANSACTIONS_EXPORT: "transactions-export", INBOX_DOCUMENT: "inbox-document", INBOX_MATCH: "inbox-match", INBOX_UPLOAD: "inbox-upload", @@ -10,7 +9,6 @@ export const Jobs = { }; export const Events = { - TRANSACTIONS_EXPORT: "transactions.export", INBOX_DOCUMENT: "inbox.document", INBOX_MATCH: "inbox.match", TRANSACTIONS_IMPORT: "transactions.import", diff --git a/packages/jobs/src/transactions/export.ts b/packages/jobs/src/transactions/export.ts deleted file mode 100644 index 28cc24dc5f..0000000000 --- a/packages/jobs/src/transactions/export.ts +++ /dev/null @@ -1,227 +0,0 @@ -import { writeToString } from "@fast-csv/format"; -import { download } from "@midday/supabase/storage"; -import { eventTrigger } from "@trigger.dev/sdk"; -import { BlobReader, BlobWriter, TextReader, ZipWriter } from "@zip.js/zip.js"; -import { revalidateTag } from "next/cache"; -import { z } from "zod"; -import { client, supabase } from "../client"; -import { Events, Jobs } from "../constants"; - -client.defineJob({ - id: Jobs.TRANSACTIONS_EXPORT, - name: "Transactions - Export", - version: "0.0.1", - trigger: eventTrigger({ - name: Events.TRANSACTIONS_EXPORT, - schema: z.object({ - transactionIds: z.array(z.string()), - teamId: z.string(), - locale: z.string(), - }), - }), - integrations: { supabase }, - run: async (payload, io) => { - const client = await io.supabase.client; - - const { transactionIds, teamId, locale } = payload; - - const filePath = `export-${new Date().toISOString()}`; - - const path = `${teamId}/exports`; - const fileName = `${filePath}.zip`; - - const generateExport = await io.createStatus("generate-export-start", { - label: "Generating export", - state: "loading", - data: { - progress: 10, - }, - }); - - const { data, count } = await client - .from("transactions") - .select( - ` - id, - date, - name, - amount, - note, - balance, - currency, - vat:calculated_vat, - attachments:transaction_attachments(*), - category:transaction_categories(id, name, description), - bank_account:bank_accounts(id, name) - `, - { count: "exact" }, - ) - .in("id", transactionIds) - .eq("team_id", teamId); - - await generateExport.update("generate-export-transaction", { - state: "loading", - data: { - progress: 30, - }, - }); - - await generateExport.update("generate-export-attachments-start", { - state: "loading", - data: { - progress: 50, - }, - }); - - const attachments = await Promise.allSettled( - (data ?? []).flatMap((transaction, idx) => { - const rowId = idx + 1; - - return (transaction.attachments ?? []).map( - async (attachment, idx2: number) => { - const filename = attachment.name?.split(".").at(0); - const extension = attachment.name?.split(".").at(-1); - - const name = - idx2 > 0 - ? `${filename}-${rowId}_${idx2}.${extension}` - : `${filename}-${rowId}.${extension}`; - - const { data } = await download(client, { - bucket: "vault", - path: (attachment.path ?? []).join("/"), - }); - - return { - id: transaction.id, - name, - blob: data, - }; - }, - ); - }), - ); - - await generateExport.update("generate-export-attachments-end", { - state: "loading", - data: { - progress: 70, - }, - }); - - await generateExport.update("generate-export-csv-start", { - state: "loading", - data: { - progress: 75, - }, - }); - - const rows = data - ?.sort((a, b) => new Date(a.date).getTime() - new Date(b.date).getTime()) - .map((transaction) => [ - transaction.date, - transaction.name, - transaction.amount, - transaction.currency, - Intl.NumberFormat(locale, { - style: "currency", - currency: transaction.currency, - }).format(transaction.amount), - transaction?.vat - ? Intl.NumberFormat(locale, { - style: "currency", - currency: transaction.currency, - }).format(transaction?.vat) - : "", - transaction?.category?.name ?? "", - transaction?.category?.description ?? "", - transaction?.attachments?.length > 0 ? "✔️" : "❌", - - attachments - .filter( - (a) => a.status === "fulfilled" && a.value?.id === transaction.id, - ) - .map((a) => a.value?.name) - .filter(Boolean) - .join(", ") ?? "", - - transaction?.balance ?? "", - transaction?.bank_account?.name ?? "", - transaction?.note ?? "", - ]); - - const csv = await writeToString(rows, { - headers: [ - "Date", - "Description", - "Amount", - "Currency", - "Formatted amount", - "VAT", - "Category", - "Category description", - "Status", - "Attachments", - "Balance", - "Account", - "Note", - ], - }); - - await generateExport.update("generate-export-csv-end", { - state: "loading", - data: { - progress: 80, - }, - }); - - await generateExport.update("generate-export-zip", { - state: "loading", - data: { - progress: 85, - }, - }); - - const zipFileWriter = new BlobWriter("application/zip"); - const zipWriter = new ZipWriter(zipFileWriter); - - zipWriter.add("transactions.csv", new TextReader(csv)); - - attachments?.map((attachment) => { - if (attachment?.value?.blob) { - zipWriter.add( - attachment.value.name, - new BlobReader(attachment.value.blob), - ); - } - }); - - const zip = await zipWriter.close(); - - await generateExport.update("generate-export-upload", { - state: "loading", - data: { - progress: 90, - }, - }); - - await client.storage - .from("vault") - .upload(`${path}/${fileName}`, await zip.arrayBuffer(), { - upsert: true, - contentType: "application/zip", - }); - - revalidateTag(`vault_${teamId}`); - - await generateExport.update("generate-export-done", { - state: "success", - data: { - filePath, - fileName, - progress: 100, - totalItems: count, - }, - }); - }, -}); diff --git a/packages/jobs/src/transactions/index.ts b/packages/jobs/src/transactions/index.ts index f638789145..3cbc23a690 100644 --- a/packages/jobs/src/transactions/index.ts +++ b/packages/jobs/src/transactions/index.ts @@ -1,3 +1,2 @@ -export * from "./export"; export * from "./import"; export * from "./update-currency";