Skip to content

Commit

Permalink
feat: Implement worker threads for upload processing
Browse files Browse the repository at this point in the history
- Offload flashcard generation to a separate worker thread
- Prevents blocking of main thread during uploads, improving responsiveness
- Addresses bug causing server slowdown during large uploads (#123)

Refs:
https://nodejs.org/api/worker_threads.html
https://stackoverflow.com/questions/61095741/how-do-i-avoid-blocking-an-express-rest-service
  • Loading branch information
aalemayhu committed Aug 31, 2024
1 parent 70ee130 commit 4fde2cc
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 91 deletions.
101 changes: 10 additions & 91 deletions src/usecases/uploads/GeneratePackagesUseCase.ts
Original file line number Diff line number Diff line change
@@ -1,108 +1,27 @@
import fs from 'fs';

import { ZipHandler } from '../../lib/anki/zip';
import Package from '../../lib/parser/Package';
import Settings from '../../lib/parser/Settings';
import {
isCSVFile,
isHTMLFile,
isMarkdownFile,
isPlainText,
isZIPFile,
} from '../../lib/storage/checks';
import { UploadedFile } from '../../lib/storage/types';

import { Body } from 'aws-sdk/clients/s3';
import { PrepareDeck } from '../../lib/parser/PrepareDeck';
import { checkFlashcardsLimits } from '../../lib/User/checkFlashcardsLimits';
import { Worker } from 'worker_threads';
import path from 'path';

export interface PackageResult {
packages: Package[];
}

export const isFileSupported = (filename: string) =>
isHTMLFile(filename) ??
isMarkdownFile(filename) ??
isPlainText(filename) ??
isCSVFile(filename);

const getPackagesFromZip = async (
fileContents: Body | undefined,
paying: boolean,
settings: Settings
): Promise<PackageResult> => {
const zipHandler = new ZipHandler();
const packages = [];

if (!fileContents) {
return { packages: [] };
}

zipHandler.build(fileContents as Uint8Array, paying);

const fileNames = zipHandler.getFileNames();

let cardCount = 0;
for (const fileName of fileNames) {
if (isFileSupported(fileName)) {
const deck = await PrepareDeck(fileName, zipHandler.files, settings);

if (deck) {
packages.push(new Package(deck.name, deck.apkg));
cardCount += deck.deck.reduce((acc, d) => acc + d.cards.length, 0);

// Checking the limit in place while iterating through the decks
checkFlashcardsLimits({
cards: 0,
decks: deck.deck,
paying,
});
}
}

// Checking the limit in place while iterating through the files
checkFlashcardsLimits({
cards: cardCount,
paying: paying,
});
}

return { packages };
};

class GeneratePackagesUseCase {
async execute(
execute(
paying: boolean,
files: UploadedFile[],
settings: Settings
): Promise<PackageResult> {
let packages: Package[] = [];
return new Promise((resolve, reject) => {
const data = { paying, files, settings };
const workerPath = path.resolve(__dirname, './worker.js');
const worker = new Worker(workerPath, { workerData: { data } });

for (const file of files) {
const fileContents = file.path ? fs.readFileSync(file.path) : file.buffer;
const filename = file.originalname;
const key = file.key;

if (isFileSupported(filename)) {
const d = await PrepareDeck(
filename,
[{ name: filename, contents: fileContents }],
settings
);
if (d) {
const pkg = new Package(d.name, d.apkg);
packages = packages.concat(pkg);
}
} else if (isZIPFile(filename) || isZIPFile(key)) {
const { packages: extraPackages } = await getPackagesFromZip(
fileContents,
paying,
settings
);
packages = packages.concat(extraPackages);
}
}
return { packages };
worker.on('message', (result: PackageResult) => resolve(result));
worker.on('error', (error) => reject(error));
});
}
}

Expand Down
63 changes: 63 additions & 0 deletions src/usecases/uploads/getPackagesFromZip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { Body } from 'aws-sdk/clients/s3';
import Settings from '../../lib/parser/Settings';
import { ZipHandler } from '../../lib/anki/zip';
import { PrepareDeck } from '../../lib/parser/PrepareDeck';
import Package from '../../lib/parser/Package';
import { checkFlashcardsLimits } from '../../lib/User/checkFlashcardsLimits';
import { PackageResult } from './GeneratePackagesUseCase';
import {
isCSVFile,
isHTMLFile,
isMarkdownFile,
isPlainText,
} from '../../lib/storage/checks';

export const isFileSupported = (filename: string) =>
isHTMLFile(filename) ??
isMarkdownFile(filename) ??
isPlainText(filename) ??
isCSVFile(filename);

export const getPackagesFromZip = async (
fileContents: Body | undefined,
paying: boolean,
settings: Settings
): Promise<PackageResult> => {
const zipHandler = new ZipHandler();
const packages = [];

if (!fileContents) {
return { packages: [] };
}

zipHandler.build(fileContents as Uint8Array, paying);

const fileNames = zipHandler.getFileNames();

let cardCount = 0;
for (const fileName of fileNames) {
if (isFileSupported(fileName)) {
const deck = await PrepareDeck(fileName, zipHandler.files, settings);

if (deck) {
packages.push(new Package(deck.name, deck.apkg));
cardCount += deck.deck.reduce((acc, d) => acc + d.cards.length, 0);

// Checking the limit in place while iterating through the decks
checkFlashcardsLimits({
cards: 0,
decks: deck.deck,
paying,
});
}
}

// Checking the limit in place while iterating through the files
checkFlashcardsLimits({
cards: cardCount,
paying: paying,
});
}

return { packages };
};
55 changes: 55 additions & 0 deletions src/usecases/uploads/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { parentPort, workerData } from 'worker_threads';
import { UploadedFile } from '../../lib/storage/types';
import Settings from '../../lib/parser/Settings';
import Package from '../../lib/parser/Package';
import fs from 'fs';
import { PrepareDeck } from '../../lib/parser/PrepareDeck';
import { isZIPFile } from '../../lib/storage/checks';
import { getPackagesFromZip, isFileSupported } from './getPackagesFromZip';

interface GenerationData {
paying: boolean;
files: UploadedFile[];
settings: Settings;
}

function doGenerationWork(data: GenerationData) {
console.log('doGenerationWork');
return new Promise(async (resolve) => {
console.log('starting generation');
const { paying, files, settings } = data;
let packages: Package[] = [];

for (const file of files) {
const fileContents = file.path ? fs.readFileSync(file.path) : file.buffer;
const filename = file.originalname;
const key = file.key;

if (isFileSupported(filename)) {
const d = await PrepareDeck(
filename,
[{ name: filename, contents: fileContents }],
settings
);
if (d) {
const pkg = new Package(d.name, d.apkg);
packages = packages.concat(pkg);
}
} else if (isZIPFile(filename) || isZIPFile(key)) {
const { packages: extraPackages } = await getPackagesFromZip(
fileContents,
paying,
settings
);
packages = packages.concat(extraPackages);
}
}
resolve({ packages });
});
}

doGenerationWork(workerData.data)
.then((result) => {
parentPort?.postMessage(result);
})
.catch(parentPort?.postMessage);

0 comments on commit 4fde2cc

Please sign in to comment.