Skip to content

Commit

Permalink
Export transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
pontusab committed Dec 2, 2024
1 parent 1767114 commit 16e6f83
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ export const generateInvoice = schemaTask({

const { user, ...invoice } = invoiceData;

const buffer = await renderToBuffer(
await PdfTemplate({
...invoice,
timezone: user?.timezone,
locale: user?.locale,
}),
);
const buffer = await renderToBuffer(await PdfTemplate(invoice));

const filename = `${invoiceData?.invoice_number}.pdf`;

Expand Down
124 changes: 124 additions & 0 deletions apps/dashboard/jobs/tasks/transactions/export.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { writeToString } from "@fast-csv/format";
import { createClient } from "@midday/supabase/job";
import { metadata, schemaTask } from "@trigger.dev/sdk/v3";
import { BlobReader, BlobWriter, TextReader, ZipWriter } from "@zip.js/zip.js";
import { serializableToBlob } from "jobs/utils/blob";
import { z } from "zod";
import { processTransactions } from "./process";

// Process transactions in batches of 100
const BATCH_SIZE = 100;

export const exportTransactions = schemaTask({
id: "export-transactions",
schema: z.object({
teamId: z.string().uuid(),
locale: z.string(),
transactionIds: z.array(z.string().uuid()),
}),
maxDuration: 300,
queue: {
concurrencyLimit: 10,
},
run: async ({ teamId, locale, transactionIds }) => {
const supabase = createClient();

const filePath = `export-${new Date().toISOString()}`;
const path = `${teamId}/exports`;
const fileName = `${filePath}.zip`;

metadata.set("progress", 20);

// Process transactions in batches of 100 and collect results
// Update progress for each batch
const results = [];

const totalBatches = Math.ceil(transactionIds.length / BATCH_SIZE);
const progressPerBatch = 60 / totalBatches;
let currentProgress = 20;

for (let i = 0; i < transactionIds.length; i += BATCH_SIZE) {
const transactionBatch = transactionIds.slice(i, i + BATCH_SIZE);

const batchResult = await processTransactions.triggerAndWait({
ids: transactionBatch,
locale,
});

results.push(batchResult);

currentProgress += progressPerBatch;
metadata.set("progress", Math.round(currentProgress));
}

const rows = results
.flatMap((r) => (r.ok ? r.output.rows : []))
// Date is the first column
.sort(
(a, b) =>
new Date(b[0] as string).getTime() -
new Date(a[0] as string).getTime(),
);

const attachments = results.flatMap((r) =>
r.ok ? r.output.attachments : [],
);

const csv = await writeToString(rows, {
headers: [
"Date",
"Description",
"Additional info",
"Amount",
"Currency",
"Formatted amount",
"VAT",
"Category",
"Category description",
"Status",
"Attachments",
"Balance",
"Account",
"Note",
],
});

const zipFileWriter = new BlobWriter("application/zip");
const zipWriter = new ZipWriter(zipFileWriter);

zipWriter.add("transactions.csv", new TextReader(csv));

metadata.set("progress", 90);

// Add attachments to zip
attachments?.map((attachment) => {
if (attachment.blob) {
zipWriter.add(
attachment.name,
new BlobReader(serializableToBlob(attachment.blob)),
);
}
});

const zip = await zipWriter.close();

metadata.set("progress", 95);

await supabase.storage
.from("vault")
.upload(`${path}/${fileName}`, await zip.arrayBuffer(), {
upsert: true,
contentType: "application/zip",
});

// revalidateTag(`vault_${teamId}`);

metadata.set("progress", 100);

return {
filePath,
fileName,
totalItems: rows.length,
};
},
});
115 changes: 115 additions & 0 deletions apps/dashboard/jobs/tasks/transactions/process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { createClient } from "@midday/supabase/job";
import { download } from "@midday/supabase/storage";
import { schemaTask } from "@trigger.dev/sdk/v3";
import { blobToSerializable } from "jobs/utils/blob";
import { processBatch } from "jobs/utils/process-batch";
import { z } from "zod";

const ATTACHMENT_BATCH_SIZE = 20;

export const processTransactions = schemaTask({
id: "process-transactions",
schema: z.object({
ids: z.array(z.string().uuid()),
locale: z.string(),
}),
maxDuration: 300,
queue: {
concurrencyLimit: 5,
},
run: async ({ ids, locale }) => {
const supabase = createClient();

const { data: transactionsData } = await supabase
.from("transactions")
.select(`
id,
date,
name,
description,
amount,
note,
balance,
currency,
vat:calculated_vat,
attachments:transaction_attachments(*),
category:transaction_categories(id, name, description),
bank_account:bank_accounts(id, name)
`)
.in("id", ids)
.throwOnError();

const attachments = await processBatch(
transactionsData ?? [],
ATTACHMENT_BATCH_SIZE,
async (batch) => {
const batchAttachments = await Promise.all(
batch.flatMap((transaction, idx) => {
const rowId = idx + 1;
return (transaction.attachments ?? []).map(
async (attachment, idx2: number) => {
const filename = attachment.name?.split(".").at(0);
const extension = attachment.name?.split(".").at(-1);

const name =
idx2 > 0
? `${filename}-${rowId}_${idx2}.${extension}`
: `${filename}-${rowId}.${extension}`;

const { data } = await download(supabase, {
bucket: "vault",
path: (attachment.path ?? []).join("/"),
});

return {
id: transaction.id,
name,
blob: data ? await blobToSerializable(data) : null,
};
},
);
}),
);

return batchAttachments.flat();
},
);

const rows = transactionsData
?.sort((a, b) => new Date(a.date).getTime() - new Date(b.date).getTime())
.map((transaction) => [
transaction.date,
transaction.name,
transaction.description,
transaction.amount,
transaction.currency,
Intl.NumberFormat(locale, {
style: "currency",
currency: transaction.currency,
}).format(transaction.amount),
transaction?.vat
? Intl.NumberFormat(locale, {
style: "currency",
currency: transaction.currency,
}).format(transaction?.vat)
: "",
transaction?.category?.name ?? "",
transaction?.category?.description ?? "",
transaction?.attachments?.length > 0 ? "✔️" : "❌",

attachments
.filter((a) => a.id === transaction.id)
.map((a) => a.name)
.join(", ") ?? "",

transaction?.balance ?? "",
transaction?.bank_account?.name ?? "",
transaction?.note ?? "",
]);

return {
rows: rows ?? [],
attachments: attachments ?? [],
};
},
});
8 changes: 8 additions & 0 deletions apps/dashboard/jobs/utils/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export async function blobToSerializable(blob: Blob) {
const arrayBuffer = await blob.arrayBuffer();
return Array.from(new Uint8Array(arrayBuffer));
}

export function serializableToBlob(array: number[], contentType = "") {
return new Blob([new Uint8Array(array)], { type: contentType });
}
17 changes: 9 additions & 8 deletions apps/dashboard/src/actions/export-transactions-action.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"use server";

import { LogEvents } from "@midday/events/events";
import { Events, client } from "@midday/jobs";
import { exportTransactions } from "jobs/tasks/transactions/export";
import { authActionClient } from "./safe-action";
import { exportTransactionsSchema } from "./schema";

Expand All @@ -15,13 +15,14 @@ export const exportTransactionsAction = authActionClient
},
})
.action(async ({ parsedInput: transactionIds, ctx: { user } }) => {
const event = await client.sendEvent({
name: Events.TRANSACTIONS_EXPORT,
payload: {
transactionIds,
teamId: user.team_id,
locale: user.locale,
},
if (!user.team_id || !user.locale) {
throw new Error("User not found");
}

const event = await exportTransactions.trigger({
teamId: user.team_id,
locale: user.locale,
transactionIds,
});

return event;
Expand Down
Loading

0 comments on commit 16e6f83

Please sign in to comment.