From d2509d73c1a7d30213f84219f62b7118fcf6c5fa Mon Sep 17 00:00:00 2001 From: Futa Arai Date: Wed, 4 Dec 2024 09:47:59 +0900 Subject: [PATCH 1/4] enable page bulk export for all file upload types --- .../interfaces/page-bulk-export.ts | 2 - .../server/models/page-bulk-export-job.ts | 2 - .../page-bulk-export-job-cron/index.ts | 11 +-- .../steps/compress-and-upload-async.ts | 90 +++++-------------- .../steps/create-page-snapshots-async.ts | 2 +- .../steps/export-pages-to-fs-async.ts | 2 +- .../src/server/service/file-uploader/azure.ts | 6 +- .../service/file-uploader/file-uploader.ts | 6 +- .../server/service/file-uploader/gridfs.ts | 4 +- .../src/server/service/file-uploader/local.ts | 2 +- 10 files changed, 34 insertions(+), 93 deletions(-) diff --git a/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts b/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts index b4ffbd50e25..5cd59a92bbd 100644 --- a/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts +++ b/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts @@ -30,8 +30,6 @@ export interface IPageBulkExportJob { user: Ref, // user that started export job page: Ref, // the root page of page tree to export lastExportedPagePath?: string, // the path of page that was exported to the fs last - uploadId?: string, // upload ID of multipart upload of S3/GCS - uploadKey?: string, // upload key of multipart upload of S3/GCS format: PageBulkExportFormat, completedAt?: Date, // the date at which job was completed attachment?: Ref, diff --git a/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts b/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts index 1611540ac58..2271316e4f9 100644 --- a/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts +++ b/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts @@ -13,8 +13,6 @@ const pageBulkExportJobSchema = new Schema({ user: { type: Schema.Types.ObjectId, ref: 'User', required: true }, page: { type: Schema.Types.ObjectId, ref: 'Page', required: true }, lastExportedPagePath: { type: String }, - uploadId: { type: String, unique: true, sparse: true }, - uploadKey: { type: String, unique: true, sparse: true }, format: { type: String, enum: Object.values(PageBulkExportFormat), required: true }, completedAt: { type: Date }, attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' }, diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts index cbdd28920ce..cf1f8004c5d 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts @@ -13,7 +13,6 @@ import type { ActivityDocument } from '~/server/models/activity'; import type { PageModel } from '~/server/models/page'; import { configManager } from '~/server/service/config-manager'; import CronService from '~/server/service/cron'; -import type { FileUploader } from '~/server/service/file-uploader'; import { preNotifyService } from '~/server/service/pre-notify'; import loggerFactory from '~/utils/logger'; @@ -38,7 +37,7 @@ export interface IPageBulkExportJobCronService { maxPartSize: number; compressExtension: string; setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void; - handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void; + handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void; notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise; getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string; } @@ -151,7 +150,7 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor } } - async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) { + async handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) { if (err == null) return; if (err instanceof BulkExportJobExpiredError) { @@ -195,7 +194,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor * Do the following in parallel: * - delete page snapshots * - remove the temporal output directory - * - abort multipart upload */ async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) { const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id); @@ -214,11 +212,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }), ]; - const fileUploadService: FileUploader = this.crowi.fileUploadService; - if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) { - promises.push(fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId)); - } - const results = await Promise.allSettled(promises); results.forEach((result) => { if (result.status === 'rejected') logger.error(result.reason); diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts index 7359e39336b..c8c6051fea8 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts @@ -1,17 +1,12 @@ -import { Writable, pipeline } from 'stream'; - import type { Archiver } from 'archiver'; import archiver from 'archiver'; -import gc from 'expose-gc/function'; import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export'; import { SupportedAction } from '~/interfaces/activity'; -import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment'; +import { AttachmentType } from '~/server/interfaces/attachment'; import type { IAttachmentDocument } from '~/server/models/attachment'; import { Attachment } from '~/server/models/attachment'; import type { FileUploader } from '~/server/service/file-uploader'; -import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader'; -import { getBufferToFixedSizeTransform } from '~/server/util/stream'; import loggerFactory from '~/utils/logger'; import type { IPageBulkExportJobCronService } from '..'; @@ -33,85 +28,42 @@ function setUpPageArchiver(): Archiver { return pageArchiver; } -function getMultipartUploadWritable( - this: IPageBulkExportJobCronService, - multipartUploader: IMultipartUploader, - pageBulkExportJob: PageBulkExportJobDocument, - attachment: IAttachmentDocument, -): Writable { - let partNumber = 1; - - return new Writable({ - write: async(part: Buffer, encoding, callback) => { - try { - await multipartUploader.uploadPart(part, partNumber); - partNumber += 1; - // First aid to prevent unexplained memory leaks - logger.info('global.gc() invoked.'); - gc(); - } - catch (err) { - await multipartUploader.abortUpload(); - callback(err); - return; - } - callback(); - }, - final: async(callback) => { - try { - await multipartUploader.completeUpload(); +async function postProcess( + this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument, attachment: IAttachmentDocument, fileSize: number, +): Promise { + attachment.fileSize = fileSize; + await attachment.save(); - const fileSize = await multipartUploader.getUploadedFileSize(); - attachment.fileSize = fileSize; - await attachment.save(); - - pageBulkExportJob.completedAt = new Date(); - pageBulkExportJob.attachment = attachment._id; - pageBulkExportJob.status = PageBulkExportJobStatus.completed; - await pageBulkExportJob.save(); + pageBulkExportJob.completedAt = new Date(); + pageBulkExportJob.attachment = attachment._id; + pageBulkExportJob.status = PageBulkExportJobStatus.completed; + await pageBulkExportJob.save(); - await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob); - } - catch (err) { - callback(err); - return; - } - callback(); - }, - }); + await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob); } - /** * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage */ export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise { const pageArchiver = setUpPageArchiver(); - const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize); if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set'); const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`; const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT); - const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`; const fileUploadService: FileUploader = this.crowi.fileUploadService; - // if the process of uploading was interrupted, delete and start from the start - if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) { - await fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId); - } - // init multipart upload - const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize); - await multipartUploader.initUpload(); - pageBulkExportJob.uploadKey = uploadKey; - pageBulkExportJob.uploadId = multipartUploader.uploadId; - await pageBulkExportJob.save(); - - const multipartUploadWritable = getMultipartUploadWritable.bind(this)(multipartUploader, pageBulkExportJob, attachment); - - pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable, (err) => { - this.handlePipelineError(err, pageBulkExportJob); - }); pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false); pageArchiver.finalize(); + this.setStreamInExecution(pageBulkExportJob._id, pageArchiver); + + try { + await fileUploadService.uploadAttachment(pageArchiver, attachment); + } + catch (e) { + logger.error(e); + this.handleError(e, pageBulkExportJob); + } + await postProcess.bind(this)(pageBulkExportJob, attachment, pageArchiver.pointer()); } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts index 02d8bb19e9b..832a6316f3c 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts @@ -85,6 +85,6 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi this.setStreamInExecution(pageBulkExportJob._id, pagesReadable); pipeline(pagesReadable, pageSnapshotsWritable, (err) => { - this.handlePipelineError(err, pageBulkExportJob); + this.handleError(err, pageBulkExportJob); }); } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts index dd7994c28b0..fb812e69a5e 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts @@ -68,6 +68,6 @@ export function exportPagesToFsAsync(this: IPageBulkExportJobCronService, pageBu this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable); pipeline(pageSnapshotsReadable, pagesWritable, (err) => { - this.handlePipelineError(err, pageBulkExportJob); + this.handleError(err, pageBulkExportJob); }); } diff --git a/apps/app/src/server/service/file-uploader/azure.ts b/apps/app/src/server/service/file-uploader/azure.ts index 654f5ca30e6..5fa6aaed661 100644 --- a/apps/app/src/server/service/file-uploader/azure.ts +++ b/apps/app/src/server/service/file-uploader/azure.ts @@ -1,4 +1,4 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; import type { TokenCredential } from '@azure/identity'; import { ClientSecretCredential } from '@azure/identity'; @@ -102,7 +102,7 @@ class AzureFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { if (!this.getIsUploadable()) { throw new Error('Azure is not configured.'); } @@ -113,7 +113,7 @@ class AzureFileUploader extends AbstractFileUploader { const blockBlobClient: BlockBlobClient = containerClient.getBlockBlobClient(filePath); const contentHeaders = new ContentHeaders(attachment); - await blockBlobClient.uploadStream(readStream, undefined, undefined, { + await blockBlobClient.uploadStream(readable, undefined, undefined, { blobHTTPHeaders: { // put type and the file name for reference information when uploading blobContentType: contentHeaders.contentType?.value.toString(), diff --git a/apps/app/src/server/service/file-uploader/file-uploader.ts b/apps/app/src/server/service/file-uploader/file-uploader.ts index 5677c583b4e..957da5cc1ab 100644 --- a/apps/app/src/server/service/file-uploader/file-uploader.ts +++ b/apps/app/src/server/service/file-uploader/file-uploader.ts @@ -1,4 +1,4 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; import type { Response } from 'express'; import { v4 as uuidv4 } from 'uuid'; @@ -39,7 +39,7 @@ export interface FileUploader { getTotalFileSize(): Promise, doCheckLimit(uploadFileSize: number, maxFileSize: number, totalLimit: number): Promise, determineResponseMode(): ResponseMode, - uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise, + uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise, respond(res: Response, attachment: IAttachmentDocument, opts?: RespondOptions): void, findDeliveryFile(attachment: IAttachmentDocument): Promise, generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise, @@ -164,7 +164,7 @@ export abstract class AbstractFileUploader implements FileUploader { throw new Error('Multipart upload not available for file upload type'); } - abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise; + abstract uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise; /** * Abort an existing multipart upload without creating a MultipartUploader instance diff --git a/apps/app/src/server/service/file-uploader/gridfs.ts b/apps/app/src/server/service/file-uploader/gridfs.ts index 93f99895cd5..4f4997866da 100644 --- a/apps/app/src/server/service/file-uploader/gridfs.ts +++ b/apps/app/src/server/service/file-uploader/gridfs.ts @@ -62,7 +62,7 @@ class GridfsFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { logger.debug(`File uploading: fileName=${attachment.fileName}`); const contentHeaders = new ContentHeaders(attachment); @@ -73,7 +73,7 @@ class GridfsFileUploader extends AbstractFileUploader { filename: attachment.fileName, contentType: contentHeaders.contentType?.value.toString(), }, - readStream, + readable, ); } diff --git a/apps/app/src/server/service/file-uploader/local.ts b/apps/app/src/server/service/file-uploader/local.ts index f7236a378ba..77c3dee3499 100644 --- a/apps/app/src/server/service/file-uploader/local.ts +++ b/apps/app/src/server/service/file-uploader/local.ts @@ -76,7 +76,7 @@ class LocalFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { throw new Error('Method not implemented.'); } From 8b0b790262f57a537088d3f0906cdea79ff02b30 Mon Sep 17 00:00:00 2001 From: Futa Arai Date: Fri, 6 Dec 2024 09:30:27 +0900 Subject: [PATCH 2/4] enable bulk export for file upload types other than s3/gcs --- .../static/locales/en_US/translation.json | 1 - .../static/locales/fr_FR/translation.json | 1 - .../static/locales/ja_JP/translation.json | 1 - .../static/locales/zh_CN/translation.json | 1 - .../Navbar/GrowiContextualSubNavigation.tsx | 33 +++++++------------ .../page-bulk-export-job-clean-up-cron.ts | 3 +- .../page-bulk-export-job-cron/index.ts | 4 +-- ...upload-async.ts => compress-and-upload.ts} | 2 +- apps/app/src/pages/[[...path]].page.tsx | 3 +- 9 files changed, 19 insertions(+), 30 deletions(-) rename apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/{compress-and-upload-async.ts => compress-and-upload.ts} (94%) diff --git a/apps/app/public/static/locales/en_US/translation.json b/apps/app/public/static/locales/en_US/translation.json index a2c5bd26112..6f15105fc78 100644 --- a/apps/app/public/static/locales/en_US/translation.json +++ b/apps/app/public/static/locales/en_US/translation.json @@ -666,7 +666,6 @@ "bulk_export_started": "Please wait a moment...", "bulk_export_download_expired": "Download period has expired", "bulk_export_job_expired": "Export process was canceled because it took too long", - "bulk_export_only_available_for": "Only available for AWS or GCP", "export_in_progress": "Export in progress", "export_in_progress_explanation": "Export with the same format is already in progress. Would you like to restart to export the latest page contents?", "export_cancel_warning": "The following export in progress will be canceled", diff --git a/apps/app/public/static/locales/fr_FR/translation.json b/apps/app/public/static/locales/fr_FR/translation.json index 38701ce4342..914a989153c 100644 --- a/apps/app/public/static/locales/fr_FR/translation.json +++ b/apps/app/public/static/locales/fr_FR/translation.json @@ -659,7 +659,6 @@ "bulk_export_started": "Patientez s'il-vous-plait...", "bulk_export_download_expired": "La période de téléchargement a expiré", "bulk_export_job_expired": "Le traitement a été interrompu car le temps d'exportation était trop long", - "bulk_export_only_available_for": "Uniquement disponible pour AWS ou GCP", "export_in_progress": "Exportation en cours", "export_in_progress_explanation": "L'exportation avec le même format est déjà en cours. Souhaitez-vous redémarrer pour exporter le dernier contenu de la page ?", "export_cancel_warning": "Les exportations suivantes en cours seront annulées", diff --git a/apps/app/public/static/locales/ja_JP/translation.json b/apps/app/public/static/locales/ja_JP/translation.json index 82b6ab52c95..73ff28aedb3 100644 --- a/apps/app/public/static/locales/ja_JP/translation.json +++ b/apps/app/public/static/locales/ja_JP/translation.json @@ -698,7 +698,6 @@ "bulk_export_started": "ただいま準備中です...", "bulk_export_download_expired": "ダウンロード期限が切れました", "bulk_export_job_expired": "エクスポート時間が長すぎるため、処理が中断されました", - "bulk_export_only_available_for": "AWS と GCP のみ対応しています", "export_in_progress": "エクスポート進行中", "export_in_progress_explanation": "既に同じ形式でのエクスポートが進行中です。最新のページ内容でエクスポートを最初からやり直しますか?", "export_cancel_warning": "進行中の以下のエクスポートはキャンセルされます", diff --git a/apps/app/public/static/locales/zh_CN/translation.json b/apps/app/public/static/locales/zh_CN/translation.json index 13282506e55..38fc6bbe03d 100644 --- a/apps/app/public/static/locales/zh_CN/translation.json +++ b/apps/app/public/static/locales/zh_CN/translation.json @@ -668,7 +668,6 @@ "bulk_export_started": "目前我们正在准备...", "bulk_export_download_expired": "下载期限已过", "bulk_export_job_expired": "由于导出时间太长,处理被中断", - "bulk_export_only_available_for": "仅适用于 AWS 或 GCP", "export_in_progress": "导出正在进行中", "export_in_progress_explanation": "已在进行相同格式的导出。您要重新启动以导出最新的页面内容吗?", "export_cancel_warning": "以下正在进行的导出将被取消", diff --git a/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx b/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx index 6d524794c3c..fb3885afc07 100644 --- a/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx +++ b/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx @@ -85,8 +85,6 @@ const PageOperationMenuItems = (props: PageOperationMenuItemsProps): JSX.Element const { data: codeMirrorEditor } = useCodeMirrorEditorIsolated(GlobalCodeMirrorEditorKey.MAIN); - const [isBulkExportTooltipOpen, setIsBulkExportTooltipOpen] = useState(false); - const syncLatestRevisionBodyHandler = useCallback(async() => { // eslint-disable-next-line no-alert const answer = window.confirm(t('sync-latest-revision-body.confirm')); @@ -144,25 +142,18 @@ const PageOperationMenuItems = (props: PageOperationMenuItemsProps): JSX.Element {/* Bulk export */} - - - cloud_download - {t('page_export.bulk_export')} - - - setIsBulkExportTooltipOpen(!isBulkExportTooltipOpen)} - > - {t('page_export.bulk_export_only_available_for')} - + {isPageBulkExportEnabled && ( + + + cloud_download + {t('page_export.bulk_export')} + + + )} diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts index 97b974be230..11c09cc9c28 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts @@ -29,7 +29,8 @@ class PageBulkExportJobCleanUpCronService extends CronService { } override async executeJob(): Promise { - const isPageBulkExportEnabled = PageBulkExportEnabledFileUploadTypes.includes(configManager.getConfig('crowi', 'app:fileUploadType')); + // TODO: allow enabling/disabling bulk export in https://redmine.weseek.co.jp/issues/158221 + const isPageBulkExportEnabled = true; if (!isPageBulkExportEnabled) return; await this.deleteExpiredExportJobs(); diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts index d292f324ade..98f807462fc 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts @@ -23,7 +23,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors'; -import { compressAndUploadAsync } from './steps/compress-and-upload-async'; +import { compressAndUpload } from './steps/compress-and-upload'; import { createPageSnapshotsAsync } from './steps/create-page-snapshots-async'; import { exportPagesToFsAsync } from './steps/export-pages-to-fs-async'; @@ -156,7 +156,7 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor exportPagesToFsAsync.bind(this)(pageBulkExportJob); } else if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) { - await compressAndUploadAsync.bind(this)(user, pageBulkExportJob); + compressAndUpload.bind(this)(user, pageBulkExportJob); } } catch (err) { diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts similarity index 94% rename from apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts rename to apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts index c8c6051fea8..c4456db9f84 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts @@ -45,7 +45,7 @@ async function postProcess( /** * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage */ -export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise { +export async function compressAndUpload(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise { const pageArchiver = setUpPageArchiver(); if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set'); diff --git a/apps/app/src/pages/[[...path]].page.tsx b/apps/app/src/pages/[[...path]].page.tsx index 4d44278345b..04165387a8f 100644 --- a/apps/app/src/pages/[[...path]].page.tsx +++ b/apps/app/src/pages/[[...path]].page.tsx @@ -585,7 +585,8 @@ function injectServerConfigurations(context: GetServerSidePropsContext, props: P props.disableLinkSharing = configManager.getConfig('crowi', 'security:disableLinkSharing'); props.isUploadAllFileAllowed = crowi.fileUploadService.getFileUploadEnabled(); props.isUploadEnabled = crowi.fileUploadService.getIsUploadable(); - props.isPageBulkExportEnabled = PageBulkExportEnabledFileUploadTypes.includes(configManager.getConfig('crowi', 'app:fileUploadType')); + // TODO: allow enabling/disabling bulk export in https://redmine.weseek.co.jp/issues/158221 + props.isPageBulkExportEnabled = true; props.isLocalAccountRegistrationEnabled = crowi.passportService.isLocalStrategySetup && configManager.getConfig('crowi', 'security:registrationMode') !== RegistrationMode.CLOSED; From e38080f196d566c4a257e7e80f56d2b175a18106 Mon Sep 17 00:00:00 2001 From: Futa Arai Date: Fri, 6 Dec 2024 09:43:19 +0900 Subject: [PATCH 3/4] remove job stream when job completes --- .../server/service/page-bulk-export-job-cron/index.ts | 3 ++- .../page-bulk-export-job-cron/steps/compress-and-upload.ts | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts index 98f807462fc..3beaafb6828 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts @@ -36,6 +36,7 @@ export interface IPageBulkExportJobCronService { maxPartSize: number; compressExtension: string; setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void; + removeStreamInExecution(jobId: ObjectIdLike): void; handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void; notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise; getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string; @@ -224,8 +225,8 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor else { streamInExecution.destroy(new BulkExportJobExpiredError()); } + this.removeStreamInExecution(pageBulkExportJob._id); } - this.removeStreamInExecution(pageBulkExportJob._id); const promises = [ PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }), diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts index c4456db9f84..06b64f7895f 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts @@ -39,6 +39,7 @@ async function postProcess( pageBulkExportJob.status = PageBulkExportJobStatus.completed; await pageBulkExportJob.save(); + this.removeStreamInExecution(pageBulkExportJob._id); await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob); } From a6a96078e3dc49e9365dee4ff57d3d42a3c1d6db Mon Sep 17 00:00:00 2001 From: Futa Arai Date: Mon, 9 Dec 2024 23:20:49 +0900 Subject: [PATCH 4/4] modify gcs uploadAttachment to use stream --- .../src/server/service/file-uploader/aws/index.ts | 6 +++--- .../src/server/service/file-uploader/gcs/index.ts | 12 +++++++----- apps/app/src/server/service/file-uploader/gridfs.ts | 1 - apps/app/src/server/service/file-uploader/local.ts | 1 - 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/app/src/server/service/file-uploader/aws/index.ts b/apps/app/src/server/service/file-uploader/aws/index.ts index a39266af2a5..a0a9432d152 100644 --- a/apps/app/src/server/service/file-uploader/aws/index.ts +++ b/apps/app/src/server/service/file-uploader/aws/index.ts @@ -1,4 +1,4 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; import type { GetObjectCommandInput, HeadObjectCommandInput } from '@aws-sdk/client-s3'; import { @@ -157,7 +157,7 @@ class AwsFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { if (!this.getIsUploadable()) { throw new Error('AWS is not configured.'); } @@ -172,7 +172,7 @@ class AwsFileUploader extends AbstractFileUploader { await s3.send(new PutObjectCommand({ Bucket: getS3Bucket(), Key: filePath, - Body: readStream, + Body: readable, ACL: getS3PutObjectCannedAcl(), // put type and the file name for reference information when uploading ContentType: contentHeaders.contentType?.value.toString(), diff --git a/apps/app/src/server/service/file-uploader/gcs/index.ts b/apps/app/src/server/service/file-uploader/gcs/index.ts index 1882b5dfd65..4fde4ece518 100644 --- a/apps/app/src/server/service/file-uploader/gcs/index.ts +++ b/apps/app/src/server/service/file-uploader/gcs/index.ts @@ -1,4 +1,5 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; +import { pipeline } from 'stream/promises'; import { Storage } from '@google-cloud/storage'; import axios from 'axios'; @@ -109,7 +110,7 @@ class GcsFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { if (!this.getIsUploadable()) { throw new Error('GCS is not configured.'); } @@ -121,11 +122,12 @@ class GcsFileUploader extends AbstractFileUploader { const filePath = getFilePathOnStorage(attachment); const contentHeaders = new ContentHeaders(attachment); - await myBucket.upload(readStream.path.toString(), { - destination: filePath, + const file = myBucket.file(filePath); + + await pipeline(readable, file.createWriteStream({ // put type and the file name for reference information when uploading contentType: contentHeaders.contentType?.value.toString(), - }); + })); } /** diff --git a/apps/app/src/server/service/file-uploader/gridfs.ts b/apps/app/src/server/service/file-uploader/gridfs.ts index 4f4997866da..e8b30af5b8c 100644 --- a/apps/app/src/server/service/file-uploader/gridfs.ts +++ b/apps/app/src/server/service/file-uploader/gridfs.ts @@ -1,4 +1,3 @@ -import type { ReadStream } from 'fs'; import { Readable } from 'stream'; import util from 'util'; diff --git a/apps/app/src/server/service/file-uploader/local.ts b/apps/app/src/server/service/file-uploader/local.ts index 77c3dee3499..40eaf6bb2de 100644 --- a/apps/app/src/server/service/file-uploader/local.ts +++ b/apps/app/src/server/service/file-uploader/local.ts @@ -1,4 +1,3 @@ -import type { ReadStream } from 'fs'; import type { Writable } from 'stream'; import { Readable } from 'stream'; import { pipeline } from 'stream/promises';