Skip to content

Commit

Permalink
Refactor import process and testing framework
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeh committed May 5, 2024
1 parent 7deaf60 commit 7b0f826
Show file tree
Hide file tree
Showing 15 changed files with 523 additions and 229 deletions.
46 changes: 23 additions & 23 deletions api/src/modules/import-data/eudr/eudr.import.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class EudrImportService {
title: 'Sourcing Records import from EUDR input file',
});

await this.cleanDataBeforeImport();
//await this.cleanDataBeforeImport();

// TODO: Check what do we need to do with indicators and specially materials:
// Do we need to ingest new materials? Activate some through the import? Activate all?
Expand Down Expand Up @@ -193,26 +193,26 @@ export class EudrImportService {
throw new BadRequestException(validationErrorArray);
}

/**
* @note: Deletes DB content from required entities
* to ensure DB is prune prior loading a XLSX dataset
*/
async cleanDataBeforeImport(): Promise<void> {
this.logger.log('Cleaning database before import...');
try {
await this.indicatorService.deactivateAllIndicators();
await this.materialService.deactivateAllMaterials();
await this.scenarioService.clearTable();
await this.indicatorRecordService.clearTable();
await this.businessUnitService.clearTable();
await this.supplierService.clearTable();
await this.sourcingLocationService.clearTable();
await this.sourcingRecordService.clearTable();
await this.geoRegionsService.deleteGeoRegionsCreatedByUser();
} catch (e: any) {
throw new Error(
`Database could not been cleaned before loading new dataset: ${e.message}`,
);
}
}
// /**
// * @note: Deletes DB content from required entities
// * to ensure DB is prune prior loading a XLSX dataset
// */
// async cleanDataBeforeImport(): Promise<void> {
// this.logger.log('Cleaning database before import...');
// try {
// await this.indicatorService.deactivateAllIndicators();
// await this.materialService.deactivateAllMaterials();
// await this.scenarioService.clearTable();
// await this.indicatorRecordService.clearTable();
// await this.businessUnitService.clearTable();
// await this.supplierService.clearTable();
// await this.sourcingLocationService.clearTable();
// await this.sourcingRecordService.clearTable();
// await this.geoRegionsService.deleteGeoRegionsCreatedByUser();
// } catch (e: any) {
// throw new Error(
// `Database could not been cleaned before loading new dataset: ${e.message}`,
// );
// }
// }
}
19 changes: 2 additions & 17 deletions api/src/modules/import-data/import-data.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,28 @@ import { ImportDataController } from 'modules/import-data/import-data.controller
import { MaterialsModule } from 'modules/materials/materials.module';
import { BusinessUnitsModule } from 'modules/business-units/business-units.module';
import { SuppliersModule } from 'modules/suppliers/suppliers.module';
import { AdminRegionsModule } from 'modules/admin-regions/admin-regions.module';
import { SourcingLocationsModule } from 'modules/sourcing-locations/sourcing-locations.module';
import { SourcingRecordsModule } from 'modules/sourcing-records/sourcing-records.module';
import { SourcingLocationGroupsModule } from 'modules/sourcing-location-groups/sourcing-location-groups.module';
import { FileService } from 'modules/import-data/file.service';
import { SourcingDataImportService } from 'modules/import-data/sourcing-data/sourcing-data-import.service';
import { SourcingRecordsDtoProcessorService } from 'modules/import-data/sourcing-data/dto-processor.service';
import { GeoCodingModule } from 'modules/geo-coding/geo-coding.module';
import { GeoRegionsModule } from 'modules/geo-regions/geo-regions.module';
import { IndicatorRecordsModule } from 'modules/indicator-records/indicator-records.module';
import { BullModule } from '@nestjs/bull';
import { ImportDataProducer } from 'modules/import-data/workers/import-data.producer';
import { ImportDataConsumer } from 'modules/import-data/workers/import-data.consumer';
import { ImportDataService } from 'modules/import-data/import-data.service';
import { TasksModule } from 'modules/tasks/tasks.module';
import { importQueueName } from 'modules/import-data/workers/import-queue.name';
import { ScenariosModule } from 'modules/scenarios/scenarios.module';
import { IndicatorsModule } from 'modules/indicators/indicators.module';
import { MulterModule } from '@nestjs/platform-express';
import * as config from 'config';
import MulterConfigService from 'modules/import-data/multer-config.service';
import { ImpactModule } from 'modules/impact/impact.module';
import { WebSocketsModule } from 'modules/notifications/websockets/websockets.module';
import { EudrImportService } from 'modules/import-data/eudr/eudr.import.service';
import { EUDRDTOProcessor } from 'modules/import-data/eudr/eudr.dto-processor.service';
import { ImportMailService } from 'modules/import-data/import-mail/import-mail.service';
import { NotificationsModule } from 'modules/notifications/notifications.module';
import { ExcelValidatorService } from 'modules/import-data/sourcing-data/validation/excel-validator.service';
import { SourcingDataDbCleaner } from 'modules/import-data/sourcing-data/sourcing-data.db-cleaner';

// TODO: Move EUDR related stuff to EUDR modules

Expand All @@ -46,21 +40,13 @@ import { ExcelValidatorService } from 'modules/import-data/sourcing-data/validat
BullModule.registerQueue({
name: 'eudr',
}),
BullModule.registerQueue({
name: 'eudr',
}),
MaterialsModule,
BusinessUnitsModule,
SuppliersModule,
AdminRegionsModule,
SourcingLocationsModule,
SourcingRecordsModule,
SourcingLocationGroupsModule,
GeoCodingModule,
GeoRegionsModule,
IndicatorRecordsModule,
TasksModule,
ScenariosModule,
IndicatorsModule,
ImpactModule,
WebSocketsModule,
Expand All @@ -74,10 +60,9 @@ import { ExcelValidatorService } from 'modules/import-data/sourcing-data/validat
ImportDataProducer,
ImportDataConsumer,
ImportDataService,
EudrImportService,
EUDRDTOProcessor,
ImportMailService,
ExcelValidatorService,
SourcingDataDbCleaner,
{
provide: 'FILE_UPLOAD_SIZE_LIMIT',
useValue: config.get('fileUploads.sizeLimit'),
Expand Down
34 changes: 0 additions & 34 deletions api/src/modules/import-data/import-data.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export class ImportDataService {
constructor(
private readonly importDataProducer: ImportDataProducer,
private readonly sourcingDataImportService: SourcingDataImportService,
private readonly eudrImport: EudrImportService,
private readonly tasksService: TasksService,
) {}

Expand Down Expand Up @@ -51,43 +50,10 @@ export class ImportDataService {
}
}

async loadEudrFile(
userId: string,
xlsxFileData: Express.Multer.File,
): Promise<Task> {
const { filename, path } = xlsxFileData;
const task: Task = await this.tasksService.createTask({
data: { filename, path },
userId,
});
try {
await this.importDataProducer.addEudrImportJob(xlsxFileData, task.id);
return task;
} catch (error: any) {
this.logger.error(
`Job for file: ${
xlsxFileData.filename
} sent by user: ${userId} could not been added to queue: ${error.toString()}`,
);

await this.tasksService.remove(task.id);
throw new ServiceUnavailableException(
`File: ${xlsxFileData.filename} could not have been loaded. Please try again later or contact the administrator`,
);
}
}

async processImportJob(job: Job<ExcelImportJob>): Promise<void> {
await this.sourcingDataImportService.importSourcingData(
job.data.xlsxFileData.path,
job.data.taskId,
);
}

async processEudrJob(job: Job<EudrImportJob>): Promise<void> {
await this.eudrImport.importEudr(
job.data.xlsxFileData.path,
job.data.taskId,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import {
} from 'modules/import-data/sourcing-data/validation/excel-validator.service';
import { ExcelValidationError } from 'modules/import-data/sourcing-data/validation/validators/excel-validation.error';
import { GeoCodingError } from 'modules/geo-coding/errors/geo-coding.error';
import { SourcingDataDbCleaner } from './sourcing-data.db-cleaner';
import { SourcingLocation } from 'modules/sourcing-locations/sourcing-location.entity';

export interface LocationData {
locationAddressInput?: string;
Expand Down Expand Up @@ -75,21 +77,15 @@ export class SourcingDataImportService {
protected readonly materialService: MaterialsService,
protected readonly businessUnitService: BusinessUnitsService,
protected readonly supplierService: SuppliersService,
protected readonly adminRegionService: AdminRegionsService,
protected readonly geoRegionsService: GeoRegionsService,
protected readonly sourcingLocationService: SourcingLocationsService,
protected readonly sourcingRecordService: SourcingRecordsService,
protected readonly sourcingLocationGroupService: SourcingLocationGroupsService,
protected readonly fileService: FileService<SourcingRecordsSheets>,
protected readonly dtoProcessor: SourcingRecordsDtoProcessorService,
protected readonly geoCodingService: GeoCodingAbstractClass,
protected readonly tasksService: TasksService,
protected readonly scenarioService: ScenariosService,
protected readonly indicatorService: IndicatorsService,
protected readonly indicatorRecordService: IndicatorRecordsService,
protected readonly impactService: ImpactService,
protected readonly impactCalculator: ImpactCalculator,
protected readonly excelValidator: ExcelValidatorService,
protected readonly dbCleaner: SourcingDataDbCleaner,
) {}

async importSourcingData(filePath: string, taskId: string): Promise<any> {
Expand All @@ -99,11 +95,6 @@ export class SourcingDataImportService {
const parsedXLSXDataset: SourcingRecordsSheets =
await this.fileService.transformToJson(filePath, SHEETS_MAP);

const sourcingLocationGroup: SourcingLocationGroup =
await this.sourcingLocationGroupService.create({
title: 'Sourcing Records import from XLSX file',
});

const { data: dtoMatchedData, validationErrors } =
await this.excelValidator.validate(
parsedXLSXDataset as unknown as SourcingDataSheet,
Expand All @@ -114,7 +105,7 @@ export class SourcingDataImportService {

//TODO: Implement transactional import. Move geocoding to first step

await this.cleanDataBeforeImport();
await this.dbCleaner.cleanDataBeforeImport();

const materials: Material[] =
await this.materialService.findAllUnpaginated();
Expand Down Expand Up @@ -171,7 +162,7 @@ export class SourcingDataImportService {
newLogs: warnings,
}));

const sourcingDataWithOrganizationalEntities: any =
const sourcingDataWithOrganizationalEntities: SourcingLocation[] =
await this.relateSourcingDataWithOrganizationalEntities(
suppliers,
businessUnits,
Expand Down Expand Up @@ -209,24 +200,24 @@ export class SourcingDataImportService {
* @note: Deletes DB content from required entities
* to ensure DB is prune prior loading a XLSX dataset
*/
async cleanDataBeforeImport(): Promise<void> {
this.logger.log('Cleaning database before import...');
try {
await this.indicatorService.deactivateAllIndicators();
await this.materialService.deactivateAllMaterials();
await this.scenarioService.clearTable();
await this.indicatorRecordService.clearTable();
await this.businessUnitService.clearTable();
await this.supplierService.clearTable();
await this.sourcingLocationService.clearTable();
await this.sourcingRecordService.clearTable();
await this.geoRegionsService.deleteGeoRegionsCreatedByUser();
} catch ({ message }) {
throw new Error(
`Database could not been cleaned before loading new dataset: ${message}`,
);
}
}
// async cleanDataBeforeImport(): Promise<void> {
// this.logger.log('Cleaning database before import...');
// try {
// await this.indicatorService.deactivateAllIndicators();
// await this.materialService.deactivateAllMaterials();
// await this.scenarioService.clearTable();
// await this.indicatorRecordService.clearTable();
// await this.businessUnitService.clearTable();
// await this.supplierService.clearTable();
// await this.sourcingLocationService.clearTable();
// await this.sourcingRecordService.clearTable();
// await this.geoRegionsService.deleteGeoRegionsCreatedByUser();
// } catch ({ message }) {
// throw new Error(
// `Database could not been cleaned before loading new dataset: ${message}`,
// );
// }
// }

/**
* @note: Type hack as mpath property does not exist on Materials and BusinessUnits, but its created
Expand All @@ -239,7 +230,7 @@ export class SourcingDataImportService {
businessUnits: Record<string, any>[],
materials: Material[],
sourcingData: SourcingData[],
): Promise<SourcingData[] | void> {
): Promise<SourcingLocation[]> {
this.logger.log(`Relating sourcing data with organizational entities`);
this.logger.log(`Supplier count: ${suppliers.length}`);
this.logger.log(`Business Units count: ${businessUnits.length}`);
Expand Down Expand Up @@ -268,9 +259,7 @@ export class SourcingDataImportService {
sourcingLocation.businessUnitId = businessUnit.id;
}
}
if (typeof sourcingLocation.materialId === 'undefined') {
return;
}

const sourcingLocationMaterialId: string = sourcingLocation.materialId;

if (!(sourcingLocationMaterialId in materialMap)) {
Expand All @@ -280,6 +269,6 @@ export class SourcingDataImportService {
}
sourcingLocation.materialId = materialMap[sourcingLocationMaterialId];
}
return sourcingData;
return sourcingData as SourcingLocation[];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { GeoRegion } from 'modules/geo-regions/geo-region.entity';
import { Scenario } from 'modules/scenarios/scenario.entity';
import { DataSource, EntityManager } from 'typeorm';
import { BusinessUnit } from 'modules/business-units/business-unit.entity';
import { SourcingLocation } from 'modules/sourcing-locations/sourcing-location.entity';
import { Supplier } from 'modules/suppliers/supplier.entity';

export class SourcingDataDbCleaner {
constructor(private readonly dataSource: DataSource) {}

async cleanDataBeforeImport(): Promise<void> {
await this.dataSource.transaction(async (manager: EntityManager) => {
await this.deleteExistingUserData(manager);
await this.deactivateAllIndicators(manager);
await this.deactivateAllMaterials(manager);
});
}

private async deactivateAllIndicators(manager: EntityManager): Promise<void> {
await manager.query('UPDATE indicators SET active = false');
}

private async deactivateAllMaterials(manager: EntityManager): Promise<void> {
await manager.query('UPDATE materials SET active = false');
}

private async deleteExistingUserData(manager: EntityManager): Promise<void> {
const entities: any = [Scenario, SourcingLocation, BusinessUnit, Supplier];
for (const entity of entities) {
await manager.getRepository(entity).delete({});
}
await this.deleteGeoRegionsCreatedByUser(manager);
}

private async deleteGeoRegionsCreatedByUser(
manager: EntityManager,
): Promise<void> {
await manager.getRepository(GeoRegion).delete({ isCreatedByUser: true });
}
}
8 changes: 4 additions & 4 deletions api/src/modules/import-data/workers/eudr.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export class ImportDataConsumer {
// TODO: Handle eudr-alerts import completion, updating async tasks
}

@Process('eudr')
async readImportDataJob(job: Job<ExcelImportJob>): Promise<void> {
await this.importDataService.processEudrJob(job);
}
// @Process('eudr')
// async readImportDataJob(job: Job<ExcelImportJob>): Promise<void> {
// await this.importDataService.processEudrJob(job);
// }
}
Loading

0 comments on commit 7b0f826

Please sign in to comment.