diff --git a/apps/app/public/static/locales/en_US/translation.json b/apps/app/public/static/locales/en_US/translation.json index a2c5bd26112..6f15105fc78 100644 --- a/apps/app/public/static/locales/en_US/translation.json +++ b/apps/app/public/static/locales/en_US/translation.json @@ -666,7 +666,6 @@ "bulk_export_started": "Please wait a moment...", "bulk_export_download_expired": "Download period has expired", "bulk_export_job_expired": "Export process was canceled because it took too long", - "bulk_export_only_available_for": "Only available for AWS or GCP", "export_in_progress": "Export in progress", "export_in_progress_explanation": "Export with the same format is already in progress. Would you like to restart to export the latest page contents?", "export_cancel_warning": "The following export in progress will be canceled", diff --git a/apps/app/public/static/locales/fr_FR/translation.json b/apps/app/public/static/locales/fr_FR/translation.json index 38701ce4342..914a989153c 100644 --- a/apps/app/public/static/locales/fr_FR/translation.json +++ b/apps/app/public/static/locales/fr_FR/translation.json @@ -659,7 +659,6 @@ "bulk_export_started": "Patientez s'il-vous-plait...", "bulk_export_download_expired": "La période de téléchargement a expiré", "bulk_export_job_expired": "Le traitement a été interrompu car le temps d'exportation était trop long", - "bulk_export_only_available_for": "Uniquement disponible pour AWS ou GCP", "export_in_progress": "Exportation en cours", "export_in_progress_explanation": "L'exportation avec le même format est déjà en cours. Souhaitez-vous redémarrer pour exporter le dernier contenu de la page ?", "export_cancel_warning": "Les exportations suivantes en cours seront annulées", diff --git a/apps/app/public/static/locales/ja_JP/translation.json b/apps/app/public/static/locales/ja_JP/translation.json index 82b6ab52c95..73ff28aedb3 100644 --- a/apps/app/public/static/locales/ja_JP/translation.json +++ b/apps/app/public/static/locales/ja_JP/translation.json @@ -698,7 +698,6 @@ "bulk_export_started": "ただいま準備中です...", "bulk_export_download_expired": "ダウンロード期限が切れました", "bulk_export_job_expired": "エクスポート時間が長すぎるため、処理が中断されました", - "bulk_export_only_available_for": "AWS と GCP のみ対応しています", "export_in_progress": "エクスポート進行中", "export_in_progress_explanation": "既に同じ形式でのエクスポートが進行中です。最新のページ内容でエクスポートを最初からやり直しますか?", "export_cancel_warning": "進行中の以下のエクスポートはキャンセルされます", diff --git a/apps/app/public/static/locales/zh_CN/translation.json b/apps/app/public/static/locales/zh_CN/translation.json index 13282506e55..38fc6bbe03d 100644 --- a/apps/app/public/static/locales/zh_CN/translation.json +++ b/apps/app/public/static/locales/zh_CN/translation.json @@ -668,7 +668,6 @@ "bulk_export_started": "目前我们正在准备...", "bulk_export_download_expired": "下载期限已过", "bulk_export_job_expired": "由于导出时间太长,处理被中断", - "bulk_export_only_available_for": "仅适用于 AWS 或 GCP", "export_in_progress": "导出正在进行中", "export_in_progress_explanation": "已在进行相同格式的导出。您要重新启动以导出最新的页面内容吗?", "export_cancel_warning": "以下正在进行的导出将被取消", diff --git a/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx b/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx index 6d524794c3c..fb3885afc07 100644 --- a/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx +++ b/apps/app/src/client/components/Navbar/GrowiContextualSubNavigation.tsx @@ -85,8 +85,6 @@ const PageOperationMenuItems = (props: PageOperationMenuItemsProps): JSX.Element const { data: codeMirrorEditor } = useCodeMirrorEditorIsolated(GlobalCodeMirrorEditorKey.MAIN); - const [isBulkExportTooltipOpen, setIsBulkExportTooltipOpen] = useState(false); - const syncLatestRevisionBodyHandler = useCallback(async() => { // eslint-disable-next-line no-alert const answer = window.confirm(t('sync-latest-revision-body.confirm')); @@ -144,25 +142,18 @@ const PageOperationMenuItems = (props: PageOperationMenuItemsProps): JSX.Element {/* Bulk export */} - - - cloud_download - {t('page_export.bulk_export')} - - - setIsBulkExportTooltipOpen(!isBulkExportTooltipOpen)} - > - {t('page_export.bulk_export_only_available_for')} - + {isPageBulkExportEnabled && ( + + + cloud_download + {t('page_export.bulk_export')} + + + )} diff --git a/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts b/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts index b4ffbd50e25..5cd59a92bbd 100644 --- a/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts +++ b/apps/app/src/features/page-bulk-export/interfaces/page-bulk-export.ts @@ -30,8 +30,6 @@ export interface IPageBulkExportJob { user: Ref, // user that started export job page: Ref, // the root page of page tree to export lastExportedPagePath?: string, // the path of page that was exported to the fs last - uploadId?: string, // upload ID of multipart upload of S3/GCS - uploadKey?: string, // upload key of multipart upload of S3/GCS format: PageBulkExportFormat, completedAt?: Date, // the date at which job was completed attachment?: Ref, diff --git a/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts b/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts index 1611540ac58..2271316e4f9 100644 --- a/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts +++ b/apps/app/src/features/page-bulk-export/server/models/page-bulk-export-job.ts @@ -13,8 +13,6 @@ const pageBulkExportJobSchema = new Schema({ user: { type: Schema.Types.ObjectId, ref: 'User', required: true }, page: { type: Schema.Types.ObjectId, ref: 'Page', required: true }, lastExportedPagePath: { type: String }, - uploadId: { type: String, unique: true, sparse: true }, - uploadKey: { type: String, unique: true, sparse: true }, format: { type: String, enum: Object.values(PageBulkExportFormat), required: true }, completedAt: { type: Date }, attachment: { type: Schema.Types.ObjectId, ref: 'Attachment' }, diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts index 97b974be230..11c09cc9c28 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts @@ -29,7 +29,8 @@ class PageBulkExportJobCleanUpCronService extends CronService { } override async executeJob(): Promise { - const isPageBulkExportEnabled = PageBulkExportEnabledFileUploadTypes.includes(configManager.getConfig('crowi', 'app:fileUploadType')); + // TODO: allow enabling/disabling bulk export in https://redmine.weseek.co.jp/issues/158221 + const isPageBulkExportEnabled = true; if (!isPageBulkExportEnabled) return; await this.deleteExpiredExportJobs(); diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts index 6764ffe56bd..3beaafb6828 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts @@ -13,7 +13,6 @@ import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils'; import type { ActivityDocument } from '~/server/models/activity'; import { configManager } from '~/server/service/config-manager'; import CronService from '~/server/service/cron'; -import type { FileUploader } from '~/server/service/file-uploader'; import { preNotifyService } from '~/server/service/pre-notify'; import loggerFactory from '~/utils/logger'; @@ -24,7 +23,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps import { BulkExportJobExpiredError, BulkExportJobRestartedError } from './errors'; -import { compressAndUploadAsync } from './steps/compress-and-upload-async'; +import { compressAndUpload } from './steps/compress-and-upload'; import { createPageSnapshotsAsync } from './steps/create-page-snapshots-async'; import { exportPagesToFsAsync } from './steps/export-pages-to-fs-async'; @@ -37,7 +36,8 @@ export interface IPageBulkExportJobCronService { maxPartSize: number; compressExtension: string; setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void; - handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void; + removeStreamInExecution(jobId: ObjectIdLike): void; + handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument): void; notifyExportResultAndCleanUp(action: SupportedActionType, pageBulkExportJob: PageBulkExportJobDocument): Promise; getTmpOutputDir(pageBulkExportJob: PageBulkExportJobDocument): string; } @@ -157,7 +157,7 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor exportPagesToFsAsync.bind(this)(pageBulkExportJob); } else if (pageBulkExportJob.status === PageBulkExportJobStatus.uploading) { - await compressAndUploadAsync.bind(this)(user, pageBulkExportJob); + compressAndUpload.bind(this)(user, pageBulkExportJob); } } catch (err) { @@ -167,11 +167,11 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor } /** - * Handle errors that occurred inside a stream pipeline + * Handle errors that occurred during page bulk export * @param err error * @param pageBulkExportJob PageBulkExportJob executed in the pipeline */ - async handlePipelineError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) { + async handleError(err: Error | null, pageBulkExportJob: PageBulkExportJobDocument) { if (err == null) return; if (err instanceof BulkExportJobExpiredError) { @@ -215,7 +215,6 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor * Do the following in parallel: * - delete page snapshots * - remove the temporal output directory - * - abort multipart upload */ async cleanUpExportJobResources(pageBulkExportJob: PageBulkExportJobDocument, restarted = false) { const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id); @@ -226,19 +225,14 @@ class PageBulkExportJobCronService extends CronService implements IPageBulkExpor else { streamInExecution.destroy(new BulkExportJobExpiredError()); } + this.removeStreamInExecution(pageBulkExportJob._id); } - this.removeStreamInExecution(pageBulkExportJob._id); const promises = [ PageBulkExportPageSnapshot.deleteMany({ pageBulkExportJob }), fs.promises.rm(this.getTmpOutputDir(pageBulkExportJob), { recursive: true, force: true }), ]; - const fileUploadService: FileUploader = this.crowi.fileUploadService; - if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) { - promises.push(fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId)); - } - const results = await Promise.allSettled(promises); results.forEach((result) => { if (result.status === 'rejected') logger.error(result.reason); diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts deleted file mode 100644 index 7359e39336b..00000000000 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload-async.ts +++ /dev/null @@ -1,117 +0,0 @@ -import { Writable, pipeline } from 'stream'; - -import type { Archiver } from 'archiver'; -import archiver from 'archiver'; -import gc from 'expose-gc/function'; - -import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export'; -import { SupportedAction } from '~/interfaces/activity'; -import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment'; -import type { IAttachmentDocument } from '~/server/models/attachment'; -import { Attachment } from '~/server/models/attachment'; -import type { FileUploader } from '~/server/service/file-uploader'; -import type { IMultipartUploader } from '~/server/service/file-uploader/multipart-uploader'; -import { getBufferToFixedSizeTransform } from '~/server/util/stream'; -import loggerFactory from '~/utils/logger'; - -import type { IPageBulkExportJobCronService } from '..'; -import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job'; - -const logger = loggerFactory('growi:service:page-bulk-export-job-cron:compress-and-upload-async'); - -function setUpPageArchiver(): Archiver { - const pageArchiver = archiver('tar', { - gzip: true, - }); - - // good practice to catch warnings (ie stat failures and other non-blocking errors) - pageArchiver.on('warning', (err) => { - if (err.code === 'ENOENT') logger.error(err); - else throw err; - }); - - return pageArchiver; -} - -function getMultipartUploadWritable( - this: IPageBulkExportJobCronService, - multipartUploader: IMultipartUploader, - pageBulkExportJob: PageBulkExportJobDocument, - attachment: IAttachmentDocument, -): Writable { - let partNumber = 1; - - return new Writable({ - write: async(part: Buffer, encoding, callback) => { - try { - await multipartUploader.uploadPart(part, partNumber); - partNumber += 1; - // First aid to prevent unexplained memory leaks - logger.info('global.gc() invoked.'); - gc(); - } - catch (err) { - await multipartUploader.abortUpload(); - callback(err); - return; - } - callback(); - }, - final: async(callback) => { - try { - await multipartUploader.completeUpload(); - - const fileSize = await multipartUploader.getUploadedFileSize(); - attachment.fileSize = fileSize; - await attachment.save(); - - pageBulkExportJob.completedAt = new Date(); - pageBulkExportJob.attachment = attachment._id; - pageBulkExportJob.status = PageBulkExportJobStatus.completed; - await pageBulkExportJob.save(); - - await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob); - } - catch (err) { - callback(err); - return; - } - callback(); - }, - }); -} - - -/** - * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage - */ -export async function compressAndUploadAsync(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise { - const pageArchiver = setUpPageArchiver(); - const bufferToPartSizeTransform = getBufferToFixedSizeTransform(this.maxPartSize); - - if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set'); - const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`; - const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT); - const uploadKey = `${FilePathOnStoragePrefix.pageBulkExport}/${attachment.fileName}`; - - const fileUploadService: FileUploader = this.crowi.fileUploadService; - // if the process of uploading was interrupted, delete and start from the start - if (pageBulkExportJob.uploadKey != null && pageBulkExportJob.uploadId != null) { - await fileUploadService.abortPreviousMultipartUpload(pageBulkExportJob.uploadKey, pageBulkExportJob.uploadId); - } - - // init multipart upload - const multipartUploader: IMultipartUploader = fileUploadService.createMultipartUploader(uploadKey, this.maxPartSize); - await multipartUploader.initUpload(); - pageBulkExportJob.uploadKey = uploadKey; - pageBulkExportJob.uploadId = multipartUploader.uploadId; - await pageBulkExportJob.save(); - - const multipartUploadWritable = getMultipartUploadWritable.bind(this)(multipartUploader, pageBulkExportJob, attachment); - - pipeline(pageArchiver, bufferToPartSizeTransform, multipartUploadWritable, (err) => { - this.handlePipelineError(err, pageBulkExportJob); - }); - pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false); - pageArchiver.finalize(); -} diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts new file mode 100644 index 00000000000..06b64f7895f --- /dev/null +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts @@ -0,0 +1,70 @@ +import type { Archiver } from 'archiver'; +import archiver from 'archiver'; + +import { PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export'; +import { SupportedAction } from '~/interfaces/activity'; +import { AttachmentType } from '~/server/interfaces/attachment'; +import type { IAttachmentDocument } from '~/server/models/attachment'; +import { Attachment } from '~/server/models/attachment'; +import type { FileUploader } from '~/server/service/file-uploader'; +import loggerFactory from '~/utils/logger'; + +import type { IPageBulkExportJobCronService } from '..'; +import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export-job'; + +const logger = loggerFactory('growi:service:page-bulk-export-job-cron:compress-and-upload-async'); + +function setUpPageArchiver(): Archiver { + const pageArchiver = archiver('tar', { + gzip: true, + }); + + // good practice to catch warnings (ie stat failures and other non-blocking errors) + pageArchiver.on('warning', (err) => { + if (err.code === 'ENOENT') logger.error(err); + else throw err; + }); + + return pageArchiver; +} + +async function postProcess( + this: IPageBulkExportJobCronService, pageBulkExportJob: PageBulkExportJobDocument, attachment: IAttachmentDocument, fileSize: number, +): Promise { + attachment.fileSize = fileSize; + await attachment.save(); + + pageBulkExportJob.completedAt = new Date(); + pageBulkExportJob.attachment = attachment._id; + pageBulkExportJob.status = PageBulkExportJobStatus.completed; + await pageBulkExportJob.save(); + + this.removeStreamInExecution(pageBulkExportJob._id); + await this.notifyExportResultAndCleanUp(SupportedAction.ACTION_PAGE_BULK_EXPORT_COMPLETED, pageBulkExportJob); +} + +/** + * Execute a pipeline that reads the page files from the temporal fs directory, compresses them, and uploads to the cloud storage + */ +export async function compressAndUpload(this: IPageBulkExportJobCronService, user, pageBulkExportJob: PageBulkExportJobDocument): Promise { + const pageArchiver = setUpPageArchiver(); + + if (pageBulkExportJob.revisionListHash == null) throw new Error('revisionListHash is not set'); + const originalName = `${pageBulkExportJob.revisionListHash}.${this.compressExtension}`; + const attachment = Attachment.createWithoutSave(null, user, originalName, this.compressExtension, 0, AttachmentType.PAGE_BULK_EXPORT); + + const fileUploadService: FileUploader = this.crowi.fileUploadService; + + pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false); + pageArchiver.finalize(); + this.setStreamInExecution(pageBulkExportJob._id, pageArchiver); + + try { + await fileUploadService.uploadAttachment(pageArchiver, attachment); + } + catch (e) { + logger.error(e); + this.handleError(e, pageBulkExportJob); + } + await postProcess.bind(this)(pageBulkExportJob, attachment, pageArchiver.pointer()); +} diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts index cdc0b514cec..d756900aab5 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts @@ -98,6 +98,6 @@ export async function createPageSnapshotsAsync(this: IPageBulkExportJobCronServi this.setStreamInExecution(pageBulkExportJob._id, pagesReadable); pipeline(pagesReadable, pageSnapshotsWritable, (err) => { - this.handlePipelineError(err, pageBulkExportJob); + this.handleError(err, pageBulkExportJob); }); } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts index caa9980b134..968804adcc2 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts @@ -74,6 +74,6 @@ export function exportPagesToFsAsync(this: IPageBulkExportJobCronService, pageBu this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable); pipeline(pageSnapshotsReadable, pagesWritable, (err) => { - this.handlePipelineError(err, pageBulkExportJob); + this.handleError(err, pageBulkExportJob); }); } diff --git a/apps/app/src/pages/[[...path]].page.tsx b/apps/app/src/pages/[[...path]].page.tsx index 4d44278345b..04165387a8f 100644 --- a/apps/app/src/pages/[[...path]].page.tsx +++ b/apps/app/src/pages/[[...path]].page.tsx @@ -585,7 +585,8 @@ function injectServerConfigurations(context: GetServerSidePropsContext, props: P props.disableLinkSharing = configManager.getConfig('crowi', 'security:disableLinkSharing'); props.isUploadAllFileAllowed = crowi.fileUploadService.getFileUploadEnabled(); props.isUploadEnabled = crowi.fileUploadService.getIsUploadable(); - props.isPageBulkExportEnabled = PageBulkExportEnabledFileUploadTypes.includes(configManager.getConfig('crowi', 'app:fileUploadType')); + // TODO: allow enabling/disabling bulk export in https://redmine.weseek.co.jp/issues/158221 + props.isPageBulkExportEnabled = true; props.isLocalAccountRegistrationEnabled = crowi.passportService.isLocalStrategySetup && configManager.getConfig('crowi', 'security:registrationMode') !== RegistrationMode.CLOSED; diff --git a/apps/app/src/server/service/file-uploader/aws/index.ts b/apps/app/src/server/service/file-uploader/aws/index.ts index a39266af2a5..a0a9432d152 100644 --- a/apps/app/src/server/service/file-uploader/aws/index.ts +++ b/apps/app/src/server/service/file-uploader/aws/index.ts @@ -1,4 +1,4 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; import type { GetObjectCommandInput, HeadObjectCommandInput } from '@aws-sdk/client-s3'; import { @@ -157,7 +157,7 @@ class AwsFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { if (!this.getIsUploadable()) { throw new Error('AWS is not configured.'); } @@ -172,7 +172,7 @@ class AwsFileUploader extends AbstractFileUploader { await s3.send(new PutObjectCommand({ Bucket: getS3Bucket(), Key: filePath, - Body: readStream, + Body: readable, ACL: getS3PutObjectCannedAcl(), // put type and the file name for reference information when uploading ContentType: contentHeaders.contentType?.value.toString(), diff --git a/apps/app/src/server/service/file-uploader/azure.ts b/apps/app/src/server/service/file-uploader/azure.ts index 654f5ca30e6..5fa6aaed661 100644 --- a/apps/app/src/server/service/file-uploader/azure.ts +++ b/apps/app/src/server/service/file-uploader/azure.ts @@ -1,4 +1,4 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; import type { TokenCredential } from '@azure/identity'; import { ClientSecretCredential } from '@azure/identity'; @@ -102,7 +102,7 @@ class AzureFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { if (!this.getIsUploadable()) { throw new Error('Azure is not configured.'); } @@ -113,7 +113,7 @@ class AzureFileUploader extends AbstractFileUploader { const blockBlobClient: BlockBlobClient = containerClient.getBlockBlobClient(filePath); const contentHeaders = new ContentHeaders(attachment); - await blockBlobClient.uploadStream(readStream, undefined, undefined, { + await blockBlobClient.uploadStream(readable, undefined, undefined, { blobHTTPHeaders: { // put type and the file name for reference information when uploading blobContentType: contentHeaders.contentType?.value.toString(), diff --git a/apps/app/src/server/service/file-uploader/file-uploader.ts b/apps/app/src/server/service/file-uploader/file-uploader.ts index 5677c583b4e..957da5cc1ab 100644 --- a/apps/app/src/server/service/file-uploader/file-uploader.ts +++ b/apps/app/src/server/service/file-uploader/file-uploader.ts @@ -1,4 +1,4 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; import type { Response } from 'express'; import { v4 as uuidv4 } from 'uuid'; @@ -39,7 +39,7 @@ export interface FileUploader { getTotalFileSize(): Promise, doCheckLimit(uploadFileSize: number, maxFileSize: number, totalLimit: number): Promise, determineResponseMode(): ResponseMode, - uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise, + uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise, respond(res: Response, attachment: IAttachmentDocument, opts?: RespondOptions): void, findDeliveryFile(attachment: IAttachmentDocument): Promise, generateTemporaryUrl(attachment: IAttachmentDocument, opts?: RespondOptions): Promise, @@ -164,7 +164,7 @@ export abstract class AbstractFileUploader implements FileUploader { throw new Error('Multipart upload not available for file upload type'); } - abstract uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise; + abstract uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise; /** * Abort an existing multipart upload without creating a MultipartUploader instance diff --git a/apps/app/src/server/service/file-uploader/gcs/index.ts b/apps/app/src/server/service/file-uploader/gcs/index.ts index 1882b5dfd65..4fde4ece518 100644 --- a/apps/app/src/server/service/file-uploader/gcs/index.ts +++ b/apps/app/src/server/service/file-uploader/gcs/index.ts @@ -1,4 +1,5 @@ -import type { ReadStream } from 'fs'; +import type { Readable } from 'stream'; +import { pipeline } from 'stream/promises'; import { Storage } from '@google-cloud/storage'; import axios from 'axios'; @@ -109,7 +110,7 @@ class GcsFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { if (!this.getIsUploadable()) { throw new Error('GCS is not configured.'); } @@ -121,11 +122,12 @@ class GcsFileUploader extends AbstractFileUploader { const filePath = getFilePathOnStorage(attachment); const contentHeaders = new ContentHeaders(attachment); - await myBucket.upload(readStream.path.toString(), { - destination: filePath, + const file = myBucket.file(filePath); + + await pipeline(readable, file.createWriteStream({ // put type and the file name for reference information when uploading contentType: contentHeaders.contentType?.value.toString(), - }); + })); } /** diff --git a/apps/app/src/server/service/file-uploader/gridfs.ts b/apps/app/src/server/service/file-uploader/gridfs.ts index 93f99895cd5..e8b30af5b8c 100644 --- a/apps/app/src/server/service/file-uploader/gridfs.ts +++ b/apps/app/src/server/service/file-uploader/gridfs.ts @@ -1,4 +1,3 @@ -import type { ReadStream } from 'fs'; import { Readable } from 'stream'; import util from 'util'; @@ -62,7 +61,7 @@ class GridfsFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { logger.debug(`File uploading: fileName=${attachment.fileName}`); const contentHeaders = new ContentHeaders(attachment); @@ -73,7 +72,7 @@ class GridfsFileUploader extends AbstractFileUploader { filename: attachment.fileName, contentType: contentHeaders.contentType?.value.toString(), }, - readStream, + readable, ); } diff --git a/apps/app/src/server/service/file-uploader/local.ts b/apps/app/src/server/service/file-uploader/local.ts index f7236a378ba..40eaf6bb2de 100644 --- a/apps/app/src/server/service/file-uploader/local.ts +++ b/apps/app/src/server/service/file-uploader/local.ts @@ -1,4 +1,3 @@ -import type { ReadStream } from 'fs'; import type { Writable } from 'stream'; import { Readable } from 'stream'; import { pipeline } from 'stream/promises'; @@ -76,7 +75,7 @@ class LocalFileUploader extends AbstractFileUploader { /** * @inheritdoc */ - override async uploadAttachment(readStream: ReadStream, attachment: IAttachmentDocument): Promise { + override async uploadAttachment(readable: Readable, attachment: IAttachmentDocument): Promise { throw new Error('Method not implemented.'); }