Skip to content

Commit

Permalink
stream completed and failure events
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeh committed Apr 22, 2024
1 parent 71d4678 commit 2ba40da
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 44 deletions.
8 changes: 6 additions & 2 deletions api/src/modules/events/app-events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ImportProgressHandler } from 'modules/events/import-data/import-progres
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { WebSocketsModule } from 'modules/notifications/websockets/websockets.module';
import { ImportProgressSocket } from 'modules/events/import-data/import-progress.socket';
import { ImportProgressTrackerFactory } from './import-data/import-progress.tracker.factory';
import { ImportProgressTrackerFactory } from 'modules/events/import-data/import-progress.tracker.factory';

@Global()
@Module({
Expand All @@ -15,6 +15,10 @@ import { ImportProgressTrackerFactory } from './import-data/import-progress.trac
ImportProgressSocket,
ImportProgressTrackerFactory,
],
exports: [ImportProgressEmitter, ImportProgressTrackerFactory],
exports: [
ImportProgressEmitter,
ImportProgressTrackerFactory,
ImportProgressSocket,
],
})
export class AppEventsModule {}
2 changes: 0 additions & 2 deletions api/src/modules/events/import-data/import-progress.emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ export class ImportProgressEmitter {
IMPORTING_DATA: 'IMPORTING_DATA',
GEOCODING: 'GEOCODING',
CALCULATING_IMPACT: 'CALCULATING_IMPACT',
FINISHED: 'FINISHED',
FAILED: 'FAILED',
};

constructor(private readonly eventBus: EventBus) {}
Expand Down
14 changes: 1 addition & 13 deletions api/src/modules/events/import-data/import-progress.event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import {
export class ImportProgressUpdateEvent implements IEvent {
stepOrder: ImportProgressSequence = [
'VALIDATING_DATA',
'IMPORTING_DATA',
'GEOCODING',
'IMPORTING_DATA',
'CALCULATING_IMPACT',
'FINISHED',
'FAILED',
];
payload: ImportProgressPayload;

Expand Down Expand Up @@ -41,16 +39,6 @@ export class ImportProgressUpdateEvent implements IEvent {
status: 'idle',
progress: 0,
},
FINISHED: {
step: 'FINISHED',
status: 'idle',
progress: 0,
},
FAILED: {
step: 'FAILED',
status: 'idle',
progress: 0,
},
};
this.updatePayload(step, progress);
}
Expand Down
27 changes: 23 additions & 4 deletions api/src/modules/events/import-data/import-progress.socket.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
import { Inject } from '@nestjs/common';
import { EVENT_KINDS } from 'modules/notifications/websockets/types';
import { IWebSocketServiceToken } from 'modules/notifications/websockets/websockets.module';
import { IWebSocketService } from 'modules/notifications/websockets/websockets.service.interface';
import { ImportProgressPayload } from 'modules/events/import-data/types';

export class ImportProgressSocket {
importDataEventKind: EVENT_KINDS.DATA_IMPORT_PROGRESS =
EVENT_KINDS.DATA_IMPORT_PROGRESS;
importDataEventKind: Record<any, any> = {
DATA_IMPORT_PROGRESS: 'DATA_IMPORT_PROGRESS',
DATA_IMPORT_COMPLETE: 'DATA_IMPORT_COMPLETED',
DATA_IMPORT_FAILURE: 'DATA_IMPORT_FAILURE',
};

constructor(
@Inject(IWebSocketServiceToken)
private readonly websockets: IWebSocketService,
) {}

emitProgressUpdateToSocket(payload: ImportProgressPayload): void {
this.websockets.emit(this.importDataEventKind, payload);
this.websockets.emit(
this.importDataEventKind.DATA_IMPORT_PROGRESS,
payload,
);
}

emitImportCompleteToSocket(completedPayload: any): void {
this.websockets.emit(
this.importDataEventKind.DATA_IMPORT_COMPLETE,
completedPayload,
);
}

emitImportFailureToSocket(failurePayload: any): void {
this.websockets.emit(
this.importDataEventKind.DATA_IMPORT_FAILURE,
failurePayload,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ import { Injectable } from '@nestjs/common';
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { GeoCodingProgressTracker } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker';
import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker';
import { ValidationProgressTracker } from 'modules/import-data/progress-tracker/validation.progress-tracker';
import { SourcingDataImportProgressTracker } from 'modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker';

// TODO: It would be nice to implement a starting point for all trackers so we have more freedom to place the progress

@Injectable()
export class ImportProgressTrackerFactory {
constructor(public readonly importProgressEmitter: ImportProgressEmitter) {
this.importProgressEmitter = importProgressEmitter;
}

createValidationProgressTracker(validationOptions: {
totalSteps: number;
}): ValidationProgressTracker {
return new ValidationProgressTracker(this.importProgressEmitter, {
totalSteps: validationOptions.totalSteps,
});
}

createGeoCodingTracker(geoCodeTrackingOptions: {
totalLocations: number;
}): GeoCodingProgressTracker {
Expand Down
8 changes: 2 additions & 6 deletions api/src/modules/events/import-data/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ export type ImportProgressSteps =
| 'VALIDATING_DATA'
| 'IMPORTING_DATA'
| 'GEOCODING'
| 'CALCULATING_IMPACT'
| 'FINISHED'
| 'FAILED';
| 'CALCULATING_IMPACT';

export type ImportProgressSequence = [
'VALIDATING_DATA',
'IMPORTING_DATA',
'GEOCODING',
'IMPORTING_DATA',
'CALCULATING_IMPACT',
'FINISHED',
'FAILED',
];

type StepStatus = 'idle' | 'processing' | 'completed';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';

export class ValidationProgressTracker {
totalSteps: number;
progress: number = 0;

constructor(
public readonly importProgressEmitter: ImportProgressEmitter,
trackingOptions: {
totalSteps: number;
},
) {
this.importProgressEmitter = importProgressEmitter;
this.totalSteps = trackingOptions.totalSteps;
}

trackProgress(): void {
this.progress++;

this.importProgressEmitter.emitValidationProgress({
progress: this.getProgress(),
});
}

private getProgress(): number {
return (this.progress / this.totalSteps) * 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import { IndicatorsService } from 'modules/indicators/indicators.service';
import { Indicator } from 'modules/indicators/indicator.entity';
import { ImpactService } from 'modules/impact/impact.service';
import { ImpactCalculator } from 'modules/indicator-records/services/impact-calculator.service';
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { ImportProgressTrackerFactory } from 'modules/events/import-data/import-progress.tracker.factory';
import { ValidationProgressTracker } from 'modules/import-data/progress-tracker/validation.progress-tracker';

export interface LocationData {
locationAddressInput?: string;
Expand Down Expand Up @@ -82,7 +83,7 @@ export class SourcingDataImportService {
protected readonly indicatorRecordService: IndicatorRecordsService,
protected readonly impactService: ImpactService,
protected readonly impactCalculator: ImpactCalculator,
protected readonly importProgress: ImportProgressEmitter,
protected readonly importProgressTrackerFactory: ImportProgressTrackerFactory,
) {}

async importSourcingData(filePath: string, taskId: string): Promise<any> {
Expand Down Expand Up @@ -340,7 +341,11 @@ export class SourcingDataImportService {
const results: any = {} as SourcingRecordsDtos;

const totalSteps: number = Object.keys(parsedXLSXDataset).length + 1; // +1 for final validation step
let currentStep: number = 0;

const tracker: ValidationProgressTracker =
this.importProgressTrackerFactory.createValidationProgressTracker({
totalSteps: totalSteps,
});

for (const [sheetName, sheetEntities] of Object.entries(
parsedXLSXDataset,
Expand All @@ -354,10 +359,7 @@ export class SourcingDataImportService {
} else {
await this.validateDTOs(results);
}
currentStep++;
this.importProgress.emitValidationProgress({
progress: (currentStep / totalSteps) * 100,
});
tracker.trackProgress();
}

return results;
Expand Down
11 changes: 4 additions & 7 deletions api/src/modules/import-data/workers/import-data.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { TasksService } from 'modules/tasks/tasks.service';
import { Task, TASK_STATUS } from 'modules/tasks/task.entity';
import { importQueueName } from 'modules/import-data/workers/import-queue.name';
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { ImportProgressSocket } from '../../events/import-data/import-progress.socket';

@Processor(importQueueName)
export class ImportDataConsumer {
Expand All @@ -21,7 +22,7 @@ export class ImportDataConsumer {
constructor(
public readonly importDataService: ImportDataService,
public readonly tasksService: TasksService,
public readonly importProgress: ImportProgressEmitter,
public readonly importSocket: ImportProgressSocket,
) {}

@OnQueueError()
Expand All @@ -40,7 +41,7 @@ export class ImportDataConsumer {
newStatus: TASK_STATUS.FAILED,
message: err.message,
});
// this.importProgress.emitImportFailed();
this.importSocket.emitImportFailureToSocket({ error: err });
this.logger.error(
`Import Failed for file: ${job.data.xlsxFileData.filename} for task: ${task.id}: ${err}`,
);
Expand All @@ -55,15 +56,11 @@ export class ImportDataConsumer {
taskId: job.data.taskId,
newStatus: TASK_STATUS.COMPLETED,
});
// this.importProgress.emitImportFinished();
this.importSocket.emitImportCompleteToSocket({ status: 'completed' });
}

@Process('excel-import-job')
async readImportDataJob(job: Job<ExcelImportJob>): Promise<void> {
await this.importDataService.processImportJob(job);
this.importProgress.emitValidationProgress({
taskId: job.data.taskId,
progress: 0,
});
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, UseGuards } from '@nestjs/common';
import { Logger } from '@nestjs/common';
import {
WebSocketGateway,
WebSocketServer,
Expand Down Expand Up @@ -29,8 +29,8 @@ export class NestWebsocketsService
}

handleConnection(client: any, ...args: any[]): any {
this.logger.warn('token in query', client.handshake.query.token);
this.logger.warn('token in header', client.handshake.header);
this.logger.log(`Client connected: ${client.id}`);
this.logger.warn('token in header', client.handshake.headers);
}

emit(event: EVENT_KINDS, payload: any): void {
Expand Down

0 comments on commit 2ba40da

Please sign in to comment.