Skip to content

Commit

Permalink
use factory to create step trackers
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeh committed Apr 22, 2024
1 parent 935b254 commit 36efdc2
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 85 deletions.
4 changes: 3 additions & 1 deletion api/src/modules/events/app-events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ImportProgressHandler } from 'modules/events/import-data/import-progres
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { WebSocketsModule } from 'modules/notifications/websockets/websockets.module';
import { ImportProgressSocket } from 'modules/events/import-data/import-progress.socket';
import { ImportProgressTrackerFactory } from './import-data/import-progress.tracker.factory';

@Global()
@Module({
Expand All @@ -12,7 +13,8 @@ import { ImportProgressSocket } from 'modules/events/import-data/import-progress
ImportProgressHandler,
ImportProgressEmitter,
ImportProgressSocket,
ImportProgressTrackerFactory,
],
exports: [ImportProgressEmitter],
exports: [ImportProgressEmitter, ImportProgressTrackerFactory],
})
export class AppEventsModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { Injectable } from '@nestjs/common';
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { GeoCodingProgressTracker } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker';
import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker';
import { SourcingDataImportProgressTracker } from 'modules/sourcing-locations/progress-tracker/sourcing-data.progress-tracker';

@Injectable()
export class ImportProgressTrackerFactory {
constructor(public readonly importProgressEmitter: ImportProgressEmitter) {
this.importProgressEmitter = importProgressEmitter;
}

createGeoCodingTracker(geoCodeTrackingOptions: {
totalLocations: number;
}): GeoCodingProgressTracker {
return new GeoCodingProgressTracker(
this.importProgressEmitter,
geoCodeTrackingOptions,
);
}

createSourcingDataImportTracker(sourcingDataImportOptions: {
totalRecords: number;
totalChunks: number;
}): SourcingDataImportProgressTracker {
return new SourcingDataImportProgressTracker(this.importProgressEmitter, {
totalRecords: sourcingDataImportOptions.totalRecords,
totalChunks: sourcingDataImportOptions.totalChunks,
});
}

createImpactCalculationProgressTracker(impactCalculationOptions: {
totalRecords: number;
totalChunks: number;
startingPercentage?: number;
}): ImpactCalculationProgressTracker {
return new ImpactCalculationProgressTracker(
this.importProgressEmitter,
impactCalculationOptions,
);
}
}
2 changes: 0 additions & 2 deletions api/src/modules/geo-coding/geo-coding.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
import { GoogleMapsGeocoder } from 'modules/geo-coding/geocoders/google-maps.geocoder';
import * as redisStore from 'cache-manager-redis-store';
import * as config from 'config';
import { GeoCodingProgressTrackerFactory } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker.factory';

const geocodingCacheConfig: any = config.get('geocodingCache');

Expand Down Expand Up @@ -61,7 +60,6 @@ const geocodingCacheEnabled: boolean =
AggregationPointGeocodingStrategy,
PointOfProductionGeocodingStrategy,
AdminRegionOfProductionService,
GeoCodingProgressTrackerFactory,
],
exports: [GeoCodingAbstractClass],
})
Expand Down
6 changes: 3 additions & 3 deletions api/src/modules/geo-coding/geo-coding.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import { GeoCodingAbstractClass } from 'modules/geo-coding/geo-coding-abstract-class';
import { AdminRegionOfProductionService } from 'modules/geo-coding/strategies/admin-region-of-production.service';
import { GeoCodingProgressTracker } from 'modules/geo-coding/progress-tracker/geo-coding.progress-tracker';
import { GeoCodingProgressTrackerFactory } from './progress-tracker/geo-coding.progress-tracker.factory';
import { ImportProgressTrackerFactory } from '../events/import-data/import-progress.tracker.factory';

interface locationInfo {
locationAddressInput?: string;
Expand All @@ -32,7 +32,7 @@ export class GeoCodingService extends GeoCodingAbstractClass {
protected readonly countryOfProductionService: CountryOfProductionGeoCodingStrategy,
protected readonly unknownLocationService: UnknownLocationGeoCodingStrategy,
protected readonly adminRegionOfProductionService: AdminRegionOfProductionService,
protected readonly progressTrackerFactory: GeoCodingProgressTrackerFactory,
protected readonly progressTrackerFactory: ImportProgressTrackerFactory,
) {
super();
}
Expand All @@ -47,7 +47,7 @@ export class GeoCodingService extends GeoCodingAbstractClass {
const errors: any[] = [];
const totalLocations: number = sourcingData.length;
const progressTracker: GeoCodingProgressTracker =
this.progressTrackerFactory.createTracker({ totalLocations });
this.progressTrackerFactory.createGeoCodingTracker({ totalLocations });
for (let i: number = 0; i < totalLocations; i++) {
const location: SourcingData = sourcingData[i];
this.logger.debug(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';

export class ImpactCalculationProgressTracker {
totalRecords: number;
progress: number = 0;
progressPerChunk: number;
private interval: NodeJS.Timer | null = null;

constructor(
private readonly importProgressEmitter: ImportProgressEmitter,
private readonly importTrackInfo: {
totalRecords: number;
totalChunks: number;
startingPercentage?: number;
estimatedTime?: number;
},
) {
this.importProgressEmitter = importProgressEmitter;
this.totalRecords = importTrackInfo.totalRecords;
const startingPercentage: number = importTrackInfo.startingPercentage ?? 0;
this.progressPerChunk =
(100 - startingPercentage) / importTrackInfo.totalChunks;
}

trackProgress(): void {
this.progress += this.progressPerChunk;

this.importProgressEmitter.emitImpactCalculationProgress({
progress: this.getProgress(),
});
}

private getProgress(): number {
return this.progress;
}

startProgressInterval(progressIncrement: number, maxProgress: number): void {
if (this.interval) {
clearInterval(this.interval);
}

this.interval = setInterval(() => {
this.progress += progressIncrement;
this.progress = Math.min(this.progress, maxProgress);
this.importProgressEmitter.emitImpactCalculationProgress({
progress: this.getProgress(),
});

if (this.progress >= maxProgress) {
this.stopProgressInterval();
}
}, 1000);
}

stopProgressInterval(): void {
if (this.interval) {
clearInterval(this.interval);
this.interval = null;
}
}
}
61 changes: 59 additions & 2 deletions api/src/modules/indicator-records/indicator-record.repository.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,70 @@
import { DataSource } from 'typeorm';
import { DataSource, QueryRunner } from 'typeorm';
import { IndicatorRecord } from 'modules/indicator-records/indicator-record.entity';
import { Injectable, Logger } from '@nestjs/common';
import { AppBaseRepository } from 'utils/app-base.repository';
import { SaveOptions } from 'typeorm/repository/SaveOptions';
import { chunk } from 'lodash';
import { AppConfig } from 'utils/app.config';
import { ImportProgressTrackerFactory } from 'modules/events/import-data/import-progress.tracker.factory';
import { ImpactCalculationProgressTracker } from 'modules/impact/progress-tracker/impact-calculation.progress-tracker';

const dbConfig: any = AppConfig.get('db');
const batchChunkSize: number = parseInt(`${dbConfig.batchChunkSize}`, 10);

@Injectable()
export class IndicatorRecordRepository extends AppBaseRepository<IndicatorRecord> {
constructor(protected dataSource: DataSource) {
constructor(
protected dataSource: DataSource,
private readonly importProgressTrackerFactory: ImportProgressTrackerFactory,
) {
super(IndicatorRecord, dataSource.createEntityManager());
}

logger: Logger = new Logger(IndicatorRecordRepository.name);

async saveChunks<IndicatorRecord>(
entities: IndicatorRecord[],
options?: SaveOptions,
): Promise<IndicatorRecord[]> {
const queryRunner: QueryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
const result: IndicatorRecord[][] = [];
const totalEntities: number = entities.length;
const totalChunks: number = Math.ceil(totalEntities / batchChunkSize);
const tracker: ImpactCalculationProgressTracker =
this.importProgressTrackerFactory.createImpactCalculationProgressTracker({
totalRecords: totalEntities,
totalChunks: totalChunks,
startingPercentage: 50,
});

try {
for (const [index, dataChunk] of chunk(
entities,
batchChunkSize,
).entries()) {
this.logger.debug(
`Inserting chunk #${index} (${dataChunk.length} items) from a total of ${totalChunks}...`,
);
const promises: Promise<IndicatorRecord>[] = dataChunk.map(
(row: IndicatorRecord) => queryRunner.manager.save(row, options),
);
const saved: IndicatorRecord[] = await Promise.all(promises);
result.push(saved);
tracker.trackProgress();
}

// commit transaction if every chunk was saved successfully
await queryRunner.commitTransaction();
} catch (err) {
// rollback changes before throwing error
await queryRunner.rollbackTransaction();
throw err;
} finally {
// release query runner which is manually created
await queryRunner.release();
}
return result.flat();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import {
CACHED_DATA_TYPE,
CachedData,
} from 'modules/cached-data/cached-data.entity';
import { ImportProgressEmitter } from 'modules/cqrs/import-data/import-progress.emitter';
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';
import { ImpactCalculationProgressTracker } from '../../impact/progress-tracker/impact-calculation.progress-tracker';
import { ImportProgressTrackerFactory } from '../../events/import-data/import-progress.tracker.factory';

/**
* @description: This is PoC (Proof of Concept) for the updated LG methodology v0.1
Expand All @@ -50,35 +52,42 @@ export class ImpactCalculator {
private readonly materialToH3: MaterialsToH3sService,
private readonly indicatorService: IndicatorsService,
private readonly dependencyManager: IndicatorQueryDependencyManager,
private readonly h3DataService: H3DataService,
private readonly cachedDataService: CachedDataService,
private readonly dataSource: DataSource,
private readonly importProgress: ImportProgressEmitter,
private readonly importProgressTrackerFactory: ImportProgressTrackerFactory,
) {}

async calculateImpactForAllSourcingRecords(
activeIndicators: Indicator[],
): Promise<void> {
const totalEstimatedTime: number = 600000; // 10 minutos en milisegundos
const totalEstimatedTime: number = 600000; // 10 minutos
const halfTime: number = totalEstimatedTime / 2; // La mitad del tiempo para esta tarea
let progress: number = 0;
const progressIncrement: number = 50 / (halfTime / 1000); // Cálculo para actualizar cada segundo hacia el 50% del total

const interval: NodeJS.Timer = setInterval(() => {
progress += progressIncrement;
progress = Math.min(progress, 50);
this.importProgress.emitImpactCalculationProgress({ progress });
if (progress >= 50) {
clearInterval(interval);
}
}, 1000);
const progressIncrement: number = 50 / (halfTime / 1000); // Cálculo para incrementar al 50%
const tracker: ImpactCalculationProgressTracker =
this.importProgressTrackerFactory.createImpactCalculationProgressTracker({
totalRecords: 1,
totalChunks: 1,
startingPercentage: 0,
});

// const interval: NodeJS.Timer = setInterval(() => {
// progress += progressIncrement;
// progress = Math.min(progress, 50);
// this.importProgress.emitImpactCalculationProgress({ progress });
// if (progress >= 50) {
// clearInterval(interval);
// }
// }, 1000);
tracker.startProgressInterval(progressIncrement, 50);
let rawData: SourcingRecordsWithIndicatorRawData[];
try {
rawData = await this.getImpactRawDataForAllSourcingRecords(
activeIndicators,
);
tracker.stopProgressInterval();
} catch (error: any) {
clearInterval(interval);
tracker.stopProgressInterval();
throw error;
}

Expand All @@ -89,7 +98,6 @@ export class ImpactCalculator {
this.calculateIndicatorValues(data, data.tonnage);

activeIndicators.forEach((indicator: Indicator) => {
this.logger.log('CREATING INDICATOR RECORD DTO');
newImpactToBeSaved.push(
IndicatorRecord.merge(new IndicatorRecord(), {
value: indicatorValues.get(indicator.nameCode),
Expand All @@ -103,14 +111,7 @@ export class ImpactCalculator {
});
});

await this.indicatorRecordRepository.saveChunks(
newImpactToBeSaved,
{},
{
step: 'CALCULATING_IMPACT',
progressStartingPoint: 50,
},
);
await this.indicatorRecordRepository.saveChunks(newImpactToBeSaved);
}

async createIndicatorRecordsBySourcingRecords(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { ImportProgressEmitter } from 'modules/events/import-data/import-progress.emitter';

export class SourcingDataImportProgressTracker {
totalRecords: number;
progress: number = 0;
progressPerChunk: number;

constructor(
private readonly importProgressEmitter: ImportProgressEmitter,
private readonly importTrackInfo: {
totalRecords: number;
totalChunks: number;
},
) {
this.importProgressEmitter = importProgressEmitter;
this.totalRecords = importTrackInfo.totalRecords;
this.progressPerChunk = (100 - 50) / importTrackInfo.totalChunks;
}

trackProgress(): void {
this.progress += this.progressPerChunk;

this.importProgressEmitter.emitGeocodingProgress({
progress: this.getProgress(),
});
}

private getProgress(): number {
return this.progress;
}
}
Loading

0 comments on commit 36efdc2

Please sign in to comment.