From ca7a5e353db896dfe51d45f323607a7c1391e3e7 Mon Sep 17 00:00:00 2001 From: dhanush-2397 Date: Fri, 17 Nov 2023 12:52:55 +0530 Subject: [PATCH 1/2] Added a processor group state change API --- .../controller/specification.controller.ts | 20 ++--- src/specifications/dto/specData.dto.ts | 14 +--- .../service/grammar/grammar.service.ts | 77 ------------------- .../processor-group-state.service.spec.ts | 18 +++++ .../processor-group-state.service.ts | 70 +++++++++++++++++ src/specifications/specifications.module.ts | 54 +++++++------ 6 files changed, 134 insertions(+), 119 deletions(-) delete mode 100644 src/specifications/service/grammar/grammar.service.ts create mode 100644 src/specifications/service/processor-group-state/processor-group-state.service.spec.ts create mode 100644 src/specifications/service/processor-group-state/processor-group-state.service.ts diff --git a/src/specifications/controller/specification.controller.ts b/src/specifications/controller/specification.controller.ts index 7d42cdf..07dc87d 100644 --- a/src/specifications/controller/specification.controller.ts +++ b/src/specifications/controller/specification.controller.ts @@ -11,19 +11,20 @@ import { specDimensionDTO, scheduleDto, specEventDTO, - s3DTO, GetGrammar + ProcessorDto } from '../dto/specData.dto'; import {DatasetService} from '../service/dataset/dataset.service'; import {ScheduleService} from '../service/schedule/schedule.service'; import {ApiTags} from '@nestjs/swagger'; -import {Grammar} from '../service/grammar/grammar.service'; import { ReadSchemaService } from '../service/read-schema/read-schema.service'; +import { ProcessorGroupStateService } from '../service/processor-group-state/processor-group-state.service'; @ApiTags('spec-ms') @Controller('') export class SpecificationController { constructor(private dimensionService: DimensionService, private EventService: EventService, private datasetService: DatasetService, - private scheduleService: ScheduleService, private readJsonFiles: ReadSchemaService, private grammar: Grammar, private pipelineService: PipelineService) { + private scheduleService: ScheduleService, private readJsonFiles: ReadSchemaService, private pipelineService: PipelineService, + private processorStateService: ProcessorGroupStateService) { } @Get('/hello') @@ -134,19 +135,18 @@ export class SpecificationController { } } - @Get('/grammar') - async getGrammar(@Query()getGrammar: GetGrammar, @Res() response: Response) { + @Post('/change-state') + async processorState(@Body() inputBody:ProcessorDto, @Res()response: Response) { try { - const result: any = await this.grammar.getGrammar(getGrammar); + const result: any = await this.processorStateService.changeProcessorGroupState(inputBody); if (result?.code == 400) { response.status(400).send({"message": result.error}); } else { - response.status(200).send({"schema": result.data.schema}); + response.status(200).send({"message": result.message}); } } catch (error) { - console.error('specification.controller.getGrammar: ', error); - throw new Error(error); + console.error('changeprocessorState: ', error); } - } + } } diff --git a/src/specifications/dto/specData.dto.ts b/src/specifications/dto/specData.dto.ts index f751db8..9224e8d 100644 --- a/src/specifications/dto/specData.dto.ts +++ b/src/specifications/dto/specData.dto.ts @@ -91,17 +91,11 @@ export class scheduleDto { program_name?: string } - -export class s3DTO { +export class ProcessorDto{ @ApiProperty() - scheduled_at?: string; + processor_group_name?: string; @ApiProperty() - scheduled_type: string; + state: string; } -export class GetGrammar { - @ApiProperty() - grammar_type: string; - @ApiProperty() - grammar_name: string; -} \ No newline at end of file + diff --git a/src/specifications/service/grammar/grammar.service.ts b/src/specifications/service/grammar/grammar.service.ts deleted file mode 100644 index 500e6a9..0000000 --- a/src/specifications/service/grammar/grammar.service.ts +++ /dev/null @@ -1,77 +0,0 @@ -import {GenericFunction} from "../genericFunction"; -import {Injectable} from "@nestjs/common"; -import {GetGrammar} from "../../dto/specData.dto"; -import {getGrammar} from '../../queries/queries'; -import {DataSource} from "typeorm"; - -@Injectable() -export class Grammar { - constructor(private dataSource: DataSource, private service: GenericFunction,) { - } - - async getGrammar(inputData: GetGrammar) { - let getGrammarSchema = { - "type": "object", - "properties": { - "grammar_type": { - "type": "string", - "enum": [ - "event", - "dataset", - "dimension" - ], - "shouldnotnull": true - }, - "grammar_name": { - "type": "string", - "shouldnotnull": true - }, - - }, - "required": [ - "grammar_type", - "grammar_name" - ] - }; - const grammarName = inputData.grammar_name; - const grammarType = inputData.grammar_type; - const isValidSchema: any = await this.service.ajvValidator(getGrammarSchema, inputData); - if (isValidSchema.errors) { - return {code: 400, error: isValidSchema.errors} - } - else { - let tableName; - switch (grammarType) { - case 'event': - tableName = 'EventGrammar'; - break; - case 'dataset': - tableName = 'DatasetGrammar'; - break; - default: - tableName = 'DimesnionGrammar'; - break; - } - - try { - const queryStr = await getGrammar(tableName, grammarName); - const queryResult = await this.dataSource.query(queryStr.query, queryStr.values); - if (queryResult.length > 0) { - return { - "code": 200, - "data": queryResult[0] - } - } - else { - return { - "code": 400, - "error": "No records found" - } - } - } catch (e) { - console.error('grammar.service.getGrammar: ', e.message); - throw new Error(e); - } - } - } -} \ No newline at end of file diff --git a/src/specifications/service/processor-group-state/processor-group-state.service.spec.ts b/src/specifications/service/processor-group-state/processor-group-state.service.spec.ts new file mode 100644 index 0000000..70bb26e --- /dev/null +++ b/src/specifications/service/processor-group-state/processor-group-state.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ProcessorGroupStateService } from './processor-group-state.service'; + +describe('ProcessorGroupStateService', () => { + let service: ProcessorGroupStateService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ProcessorGroupStateService], + }).compile(); + + service = module.get(ProcessorGroupStateService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/src/specifications/service/processor-group-state/processor-group-state.service.ts b/src/specifications/service/processor-group-state/processor-group-state.service.ts new file mode 100644 index 0000000..6ccde87 --- /dev/null +++ b/src/specifications/service/processor-group-state/processor-group-state.service.ts @@ -0,0 +1,70 @@ +import { Injectable } from "@nestjs/common"; +import { ScheduleService } from "../schedule/schedule.service"; +import { HttpCustomService } from "../HttpCustomService"; +import { ProcessorDto } from "src/specifications/dto/specData.dto"; +import { GenericFunction } from "../genericFunction"; + +@Injectable() +export class ProcessorGroupStateService { + scheduleSchema = { + type: "object", + properties: { + processor_group_name: { + type: "string", + shouldnotnull: true, + }, + state: { + type: "string", + shouldnotnull: true, + enum: ["RUNNING", "STOPPED"], + }, + }, + required: ["processor_group_name", "state"], + }; + nifiUrl: string = `${process.env.NIFI_HOST}:${process.env.NIFI_PORT}`; + constructor( + private scheduleService: ScheduleService, + private http: HttpCustomService, + private specService: GenericFunction + ) {} + async changeProcessorGroupState(inputProcessorData: ProcessorDto) { + let isValidSchema: any; + isValidSchema = await this.specService.ajvValidator( + this.scheduleSchema, + inputProcessorData + ); + if (isValidSchema.errors) { + return { code: 400, error: isValidSchema.errors }; + } else { + try { + const processorGroups = await this.scheduleService.getRootDetails(); + let pg_list = processorGroups.data; + let processorGroupName = inputProcessorData.processor_group_name; + let counter = 0; + let data = {}; + let pg_group = pg_list["processGroupFlow"]["flow"]["processGroups"]; + for (let pg of pg_group) { + if (pg.component.name == processorGroupName) { + let pg_source = pg; + counter = counter + 1; + data = { + id: pg_source["component"]["id"], + state: inputProcessorData.state, // RUNNING or STOP + disconnectedNodeAcknowledged: false, + }; + const result = await this.http.put( + `${this.nifiUrl}/nifi-api/flow/process-groups/${pg_source["component"]["id"]}`, + data + ); + console.log("the result is:",result.data); + return {code: 200,message:`changed the state of ${processorGroupName}`} + } + } + return {code:400,error:"Could not find the processor group"} + } catch (error) { + return {code:400,error:error?.message} + + } + } + } +} diff --git a/src/specifications/specifications.module.ts b/src/specifications/specifications.module.ts index 0517140..5bbfe95 100644 --- a/src/specifications/specifications.module.ts +++ b/src/specifications/specifications.module.ts @@ -1,25 +1,35 @@ -import {HttpCustomService} from './service/HttpCustomService'; -import {EventService} from './service/event/event.service'; -import {Dimension} from './../typeorm/dimension.entity'; -import {Module} from '@nestjs/common'; -import {SpecificationController} from './controller/specification.controller'; -import {TypeOrmModule} from '@nestjs/typeorm'; -import {DimensionService} from './service/dimension/dimension.service'; -import {GenericFunction} from './service/genericFunction'; -import {TransformerService} from './service/transformer/transformer.service'; -import {DatasetService} from './service/dataset/dataset.service'; -import {HttpModule} from '@nestjs/axios'; -import { PipelineService } from '../specifications/service/pipeline-old/pipeline.service'; -import {ScheduleService} from './service/schedule/schedule.service'; -import { PipelineGenericService } from './service/pipeline-generic/pipeline-generic.service'; -import {Grammar} from "./service/grammar/grammar.service"; -import { ReadSchemaService } from './service/read-schema/read-schema.service'; +import { HttpCustomService } from "./service/HttpCustomService"; +import { EventService } from "./service/event/event.service"; +import { Dimension } from "./../typeorm/dimension.entity"; +import { Module } from "@nestjs/common"; +import { SpecificationController } from "./controller/specification.controller"; +import { TypeOrmModule } from "@nestjs/typeorm"; +import { DimensionService } from "./service/dimension/dimension.service"; +import { GenericFunction } from "./service/genericFunction"; +import { TransformerService } from "./service/transformer/transformer.service"; +import { DatasetService } from "./service/dataset/dataset.service"; +import { HttpModule } from "@nestjs/axios"; +import { PipelineService } from "../specifications/service/pipeline-old/pipeline.service"; +import { ScheduleService } from "./service/schedule/schedule.service"; +import { PipelineGenericService } from "./service/pipeline-generic/pipeline-generic.service"; +import { ReadSchemaService } from "./service/read-schema/read-schema.service"; +import { ProcessorGroupStateService } from "./service/processor-group-state/processor-group-state.service"; @Module({ - imports: [HttpModule], - controllers: [SpecificationController], - providers: [DimensionService, EventService, GenericFunction, TransformerService, DatasetService, PipelineService, HttpCustomService, ScheduleService, Grammar, PipelineGenericService,ReadSchemaService], - + imports: [HttpModule], + controllers: [SpecificationController], + providers: [ + DimensionService, + EventService, + GenericFunction, + TransformerService, + DatasetService, + PipelineService, + HttpCustomService, + ScheduleService, + PipelineGenericService, + ReadSchemaService, + ProcessorGroupStateService + ], }) -export class SpecificationsModule { -} +export class SpecificationsModule {} From 3e63bb44e2e0f3dc1da3a96656b999bf0fc2e015 Mon Sep 17 00:00:00 2001 From: dhanush-2397 Date: Mon, 11 Dec 2023 13:21:05 +0530 Subject: [PATCH 2/2] API added and YAML changes --- spec.yaml | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/spec.yaml b/spec.yaml index 83236ea..01e0d6b 100644 --- a/spec.yaml +++ b/spec.yaml @@ -201,6 +201,42 @@ paths: description: "Something went wrong" schema: $ref: "#/definitions/generic_error" + + /spec/change-state: + post: + tags: + - "spec" + summary: "Change the state of a processor group in NiFi" + description: "Change the state of a processor group in NiFi. It can either start the processor group or stop iy." + operationId: "changeState" + produces: + - "application/json" + parameters: + - in: "body" + name: "body" + required: true + schema: + type: "object" + properties: + processor_group_name: + type: "string" + example: "Run_adapters" + state: + type: "string" + example: "STOPPED" + responses: + 200: + description: "Successfully changed the state" + schema: + type: "object" + properties: + message: + type: "string" + example: "changed the state of Run_adapters" + 400: + description: "Something went wrong" + schema: + $ref: "#/definitions/generic_error" definitions: generic_error: