From 2ba40da4dc836362e41dd36a4fdea88cafc87699 Mon Sep 17 00:00:00 2001 From: alexeh Date: Mon, 22 Apr 2024 16:10:07 +0300 Subject: [PATCH] stream completed and failure events --- api/src/modules/events/app-events.module.ts | 8 ++++-- .../import-data/import-progress.emitter.ts | 2 -- .../import-data/import-progress.event.ts | 14 +--------- .../import-data/import-progress.socket.ts | 27 +++++++++++++++--- .../import-progress.tracker.factory.ts | 11 ++++++++ api/src/modules/events/import-data/types.ts | 8 ++---- .../validation.progress-tracker.ts | 28 +++++++++++++++++++ .../sourcing-data-import.service.ts | 16 ++++++----- .../workers/import-data.consumer.ts | 11 +++----- .../websockets/websockets.service.ts | 6 ++-- 10 files changed, 87 insertions(+), 44 deletions(-) create mode 100644 api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts diff --git a/api/src/modules/events/app-events.module.ts b/api/src/modules/events/app-events.module.ts index a25b74b5e..16006bfe8 100644 --- a/api/src/modules/events/app-events.module.ts +++ b/api/src/modules/events/app-events.module.ts @@ -4,7 +4,7 @@ import { ImportProgressHandler } from 'modules/events/import-data/import-progres import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter'; import { WebSocketsModule } from 'modules/notifications/websockets/websockets.module'; import { ImportProgressSocket } from 'modules/events/import-data/import-progress.socket'; -import { ImportProgressTrackerFactory } from './import-data/import-progress.tracker.factory'; +import { ImportProgressTrackerFactory } from 'modules/events/import-data/import-progress.tracker.factory'; @Global() @Module({ @@ -15,6 +15,10 @@ import { ImportProgressTrackerFactory } from './import-data/import-progress.trac ImportProgressSocket, ImportProgressTrackerFactory, ], - exports: [ImportProgressEmitter, ImportProgressTrackerFactory], + exports: [ + ImportProgressEmitter, + ImportProgressTrackerFactory, + ImportProgressSocket, + ], }) export class AppEventsModule {} diff --git a/api/src/modules/events/import-data/import-progress.emitter.ts b/api/src/modules/events/import-data/import-progress.emitter.ts index 826c4f7d0..e359777e0 100644 --- a/api/src/modules/events/import-data/import-progress.emitter.ts +++ b/api/src/modules/events/import-data/import-progress.emitter.ts @@ -14,8 +14,6 @@ export class ImportProgressEmitter { IMPORTING_DATA: 'IMPORTING_DATA', GEOCODING: 'GEOCODING', CALCULATING_IMPACT: 'CALCULATING_IMPACT', - FINISHED: 'FINISHED', - FAILED: 'FAILED', }; constructor(private readonly eventBus: EventBus) {} diff --git a/api/src/modules/events/import-data/import-progress.event.ts b/api/src/modules/events/import-data/import-progress.event.ts index b925dbd1b..74b2ae3ad 100644 --- a/api/src/modules/events/import-data/import-progress.event.ts +++ b/api/src/modules/events/import-data/import-progress.event.ts @@ -8,11 +8,9 @@ import { export class ImportProgressUpdateEvent implements IEvent { stepOrder: ImportProgressSequence = [ 'VALIDATING_DATA', - 'IMPORTING_DATA', 'GEOCODING', + 'IMPORTING_DATA', 'CALCULATING_IMPACT', - 'FINISHED', - 'FAILED', ]; payload: ImportProgressPayload; @@ -41,16 +39,6 @@ export class ImportProgressUpdateEvent implements IEvent { status: 'idle', progress: 0, }, - FINISHED: { - step: 'FINISHED', - status: 'idle', - progress: 0, - }, - FAILED: { - step: 'FAILED', - status: 'idle', - progress: 0, - }, }; this.updatePayload(step, progress); } diff --git a/api/src/modules/events/import-data/import-progress.socket.ts b/api/src/modules/events/import-data/import-progress.socket.ts index 5c28a0673..4a0134d95 100644 --- a/api/src/modules/events/import-data/import-progress.socket.ts +++ b/api/src/modules/events/import-data/import-progress.socket.ts @@ -1,12 +1,14 @@ import { Inject } from '@nestjs/common'; -import { EVENT_KINDS } from 'modules/notifications/websockets/types'; import { IWebSocketServiceToken } from 'modules/notifications/websockets/websockets.module'; import { IWebSocketService } from 'modules/notifications/websockets/websockets.service.interface'; import { ImportProgressPayload } from 'modules/events/import-data/types'; export class ImportProgressSocket { - importDataEventKind: EVENT_KINDS.DATA_IMPORT_PROGRESS = - EVENT_KINDS.DATA_IMPORT_PROGRESS; + importDataEventKind: Record = { + DATA_IMPORT_PROGRESS: 'DATA_IMPORT_PROGRESS', + DATA_IMPORT_COMPLETE: 'DATA_IMPORT_COMPLETED', + DATA_IMPORT_FAILURE: 'DATA_IMPORT_FAILURE', + }; constructor( @Inject(IWebSocketServiceToken) @@ -14,6 +16,23 @@ export class ImportProgressSocket { ) {} emitProgressUpdateToSocket(payload: ImportProgressPayload): void { - this.websockets.emit(this.importDataEventKind, payload); + this.websockets.emit( + this.importDataEventKind.DATA_IMPORT_PROGRESS, + payload, + ); + } + + emitImportCompleteToSocket(completedPayload: any): void { + this.websockets.emit( + this.importDataEventKind.DATA_IMPORT_COMPLETE, + completedPayload, + ); + } + + emitImportFailureToSocket(failurePayload: any): void { + this.websockets.emit( + this.importDataEventKind.DATA_IMPORT_FAILURE, + failurePayload, + ); } } diff --git a/api/src/modules/events/import-data/import-progress.tracker.factory.ts b/api/src/modules/events/import-data/import-progress.tracker.factory.ts index f1d2125d2..6bb9b4da5 100644 --- a/api/src/modules/events/import-data/import-progress.tracker.factory.ts +++ b/api/src/modules/events/import-data/import-progress.tracker.factory.ts @@ -2,14 +2,25 @@ import { Injectable } from '@nestjs/common'; import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter'; import { GeoCodingProgressTracker } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker'; import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker'; +import { ValidationProgressTracker } from 'modules/import-data/progress-tracker/validation.progress-tracker'; import { SourcingDataImportProgressTracker } from 'modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker'; +// TODO: It would be nice to implement a starting point for all trackers so we have more freedom to place the progress + @Injectable() export class ImportProgressTrackerFactory { constructor(public readonly importProgressEmitter: ImportProgressEmitter) { this.importProgressEmitter = importProgressEmitter; } + createValidationProgressTracker(validationOptions: { + totalSteps: number; + }): ValidationProgressTracker { + return new ValidationProgressTracker(this.importProgressEmitter, { + totalSteps: validationOptions.totalSteps, + }); + } + createGeoCodingTracker(geoCodeTrackingOptions: { totalLocations: number; }): GeoCodingProgressTracker { diff --git a/api/src/modules/events/import-data/types.ts b/api/src/modules/events/import-data/types.ts index 5b38567d3..eabb2e23f 100644 --- a/api/src/modules/events/import-data/types.ts +++ b/api/src/modules/events/import-data/types.ts @@ -2,17 +2,13 @@ export type ImportProgressSteps = | 'VALIDATING_DATA' | 'IMPORTING_DATA' | 'GEOCODING' - | 'CALCULATING_IMPACT' - | 'FINISHED' - | 'FAILED'; + | 'CALCULATING_IMPACT'; export type ImportProgressSequence = [ 'VALIDATING_DATA', - 'IMPORTING_DATA', 'GEOCODING', + 'IMPORTING_DATA', 'CALCULATING_IMPACT', - 'FINISHED', - 'FAILED', ]; type StepStatus = 'idle' | 'processing' | 'completed'; diff --git a/api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts b/api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts new file mode 100644 index 000000000..5e195c045 --- /dev/null +++ b/api/src/modules/import-data/progress-tracker/validation.progress-tracker.ts @@ -0,0 +1,28 @@ +import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter'; + +export class ValidationProgressTracker { + totalSteps: number; + progress: number = 0; + + constructor( + public readonly importProgressEmitter: ImportProgressEmitter, + trackingOptions: { + totalSteps: number; + }, + ) { + this.importProgressEmitter = importProgressEmitter; + this.totalSteps = trackingOptions.totalSteps; + } + + trackProgress(): void { + this.progress++; + + this.importProgressEmitter.emitValidationProgress({ + progress: this.getProgress(), + }); + } + + private getProgress(): number { + return (this.progress / this.totalSteps) * 100; + } +} diff --git a/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts b/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts index 1965c7da0..5d7ee74a7 100644 --- a/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts +++ b/api/src/modules/import-data/sourcing-data/sourcing-data-import.service.ts @@ -31,7 +31,8 @@ import { IndicatorsService } from 'modules/indicators/indicators.service'; import { Indicator } from 'modules/indicators/indicator.entity'; import { ImpactService } from 'modules/impact/impact.service'; import { ImpactCalculator } from 'modules/indicator-records/services/impact-calculator.service'; -import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter'; +import { ImportProgressTrackerFactory } from 'modules/events/import-data/import-progress.tracker.factory'; +import { ValidationProgressTracker } from 'modules/import-data/progress-tracker/validation.progress-tracker'; export interface LocationData { locationAddressInput?: string; @@ -82,7 +83,7 @@ export class SourcingDataImportService { protected readonly indicatorRecordService: IndicatorRecordsService, protected readonly impactService: ImpactService, protected readonly impactCalculator: ImpactCalculator, - protected readonly importProgress: ImportProgressEmitter, + protected readonly importProgressTrackerFactory: ImportProgressTrackerFactory, ) {} async importSourcingData(filePath: string, taskId: string): Promise { @@ -340,7 +341,11 @@ export class SourcingDataImportService { const results: any = {} as SourcingRecordsDtos; const totalSteps: number = Object.keys(parsedXLSXDataset).length + 1; // +1 for final validation step - let currentStep: number = 0; + + const tracker: ValidationProgressTracker = + this.importProgressTrackerFactory.createValidationProgressTracker({ + totalSteps: totalSteps, + }); for (const [sheetName, sheetEntities] of Object.entries( parsedXLSXDataset, @@ -354,10 +359,7 @@ export class SourcingDataImportService { } else { await this.validateDTOs(results); } - currentStep++; - this.importProgress.emitValidationProgress({ - progress: (currentStep / totalSteps) * 100, - }); + tracker.trackProgress(); } return results; diff --git a/api/src/modules/import-data/workers/import-data.consumer.ts b/api/src/modules/import-data/workers/import-data.consumer.ts index adc27c849..7ea4796c7 100644 --- a/api/src/modules/import-data/workers/import-data.consumer.ts +++ b/api/src/modules/import-data/workers/import-data.consumer.ts @@ -13,6 +13,7 @@ import { TasksService } from 'modules/tasks/tasks.service'; import { Task, TASK_STATUS } from 'modules/tasks/task.entity'; import { importQueueName } from 'modules/import-data/workers/import-queue.name'; import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter'; +import { ImportProgressSocket } from '../../events/import-data/import-progress.socket'; @Processor(importQueueName) export class ImportDataConsumer { @@ -21,7 +22,7 @@ export class ImportDataConsumer { constructor( public readonly importDataService: ImportDataService, public readonly tasksService: TasksService, - public readonly importProgress: ImportProgressEmitter, + public readonly importSocket: ImportProgressSocket, ) {} @OnQueueError() @@ -40,7 +41,7 @@ export class ImportDataConsumer { newStatus: TASK_STATUS.FAILED, message: err.message, }); - // this.importProgress.emitImportFailed(); + this.importSocket.emitImportFailureToSocket({ error: err }); this.logger.error( `Import Failed for file: ${job.data.xlsxFileData.filename} for task: ${task.id}: ${err}`, ); @@ -55,15 +56,11 @@ export class ImportDataConsumer { taskId: job.data.taskId, newStatus: TASK_STATUS.COMPLETED, }); - // this.importProgress.emitImportFinished(); + this.importSocket.emitImportCompleteToSocket({ status: 'completed' }); } @Process('excel-import-job') async readImportDataJob(job: Job): Promise { await this.importDataService.processImportJob(job); - this.importProgress.emitValidationProgress({ - taskId: job.data.taskId, - progress: 0, - }); } } diff --git a/api/src/modules/notifications/websockets/websockets.service.ts b/api/src/modules/notifications/websockets/websockets.service.ts index 929b879f3..f9f598fe9 100644 --- a/api/src/modules/notifications/websockets/websockets.service.ts +++ b/api/src/modules/notifications/websockets/websockets.service.ts @@ -1,4 +1,4 @@ -import { Logger, UseGuards } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { WebSocketGateway, WebSocketServer, @@ -29,8 +29,8 @@ export class NestWebsocketsService } handleConnection(client: any, ...args: any[]): any { - this.logger.warn('token in query', client.handshake.query.token); - this.logger.warn('token in header', client.handshake.header); + this.logger.log(`Client connected: ${client.id}`); + this.logger.warn('token in header', client.handshake.headers); } emit(event: EVENT_KINDS, payload: any): void {