Skip to content

Commit

Permalink
refactor: pruning worker processors (#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
BEdev24 authored Sep 11, 2024
1 parent 9ade333 commit dec6507
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 34 deletions.
4 changes: 2 additions & 2 deletions worker-service/src/common/common-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ export abstract class CommonService {
};
return govActionProposal;
} catch (e) {
this.logger.log(
`There has been an exception when fetching data for governance action proposal: ${e}`,
this.logger.warn(
`Error when fetching GAP metadata from url ${url}; Message: ${e}`,
);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import {
import { Logger } from '@nestjs/common';
import { GovActionProposalService } from '../../services/gov-action-proposal.service';


@Processor(QUEUE_NAME_DB_SYNC_GOV_ACTIONS)
export class GovActionsProposalProcessor extends WorkerHost {
protected readonly logger = new Logger(GovActionsProposalProcessor.name);
constructor(
private readonly govActionProposalService: GovActionProposalService,
private readonly govActionProposalService: GovActionProposalService
) {
super();
}
async process(job: Job<any>): Promise<any> {
switch (job.name) {
case JOB_NAME_GOV_ACTIONS_SYNC: {
this.logger.debug('Data from db-sync for gov action proposals job');
return job.data;
try {
this.logger.debug(
`Processing GAP - amount of GAPs to be processed, ${job.data?.length}`,
);
await this.govActionProposalService.storeGovActionProposalData(
job.data,
);
} catch (error) {
this.logger.error(
`Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`,
);
throw error;
}
}
}
}

@OnWorkerEvent('completed')
onCompleted(job: Job) {
const { id, name, queueName, finishedOn, returnvalue } = job;
const completionTime = finishedOn ? new Date(finishedOn).toISOString() : '';
this.govActionProposalService.storeGovActionProposalData(returnvalue);
this.logger.log(
`Job id: ${id}, name: ${name} completed in queue ${queueName} on ${completionTime}.`,
);
}
}
28 changes: 16 additions & 12 deletions worker-service/src/governance/queues/processors/vote.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from '../../../common/constants/bullmq.constants';
import { Logger } from '@nestjs/common';
import { VoteService } from '../../services/vote.service';
import { VoteRequest } from 'src/governance/dto/vote.request';

@Processor(QUEUE_NAME_DB_SYNC_VOTES)
export class VoteProcessor extends WorkerHost {
Expand All @@ -17,19 +18,22 @@ export class VoteProcessor extends WorkerHost {
async process(job: Job<any>): Promise<any> {
switch (job.name) {
case JOB_NAME_VOTE_SYNC: {
this.logger.debug('Data from db-sync for votes job');
return job.data;
try {
const voteRequests: VoteRequest[] = job.data;
const addresses = voteRequests.map(
(voteRequest) => voteRequest.hotAddress,
);
this.logger.debug(
`Processing votes for addresses:, ${JSON.stringify(addresses)}`,
);
await this.voteService.storeVoteData(voteRequests);
} catch (error) {
this.logger.error(
`Error processing job id: ${job.id}, name: ${job.name}. - Error: ${error}`,
);
throw error;
}
}
}
}

@OnWorkerEvent('completed')
onCompleted(job: Job) {
const { id, name, queueName, finishedOn, returnvalue } = job;
const completionTime = finishedOn ? new Date(finishedOn).toISOString() : '';
this.voteService.storeVoteData(returnvalue);
this.logger.log(
`Job id: ${id}, name: ${name} completed in queue ${queueName} on ${completionTime}.`,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,9 @@ export class GovActionProposalService extends CommonService {
private readonly configService: ConfigService,
) {
super(dataSource);
// this.cronInterval =
// this.configService.get<string>('GOV_ACTION_PROPOSALS_JOB_FREQUENCY') ||
// '0 * * * * *';
this.logger = new Logger(GovActionProposalService.name);
}

// getCronExpression(): string {
// return this.cronInterval;
// }

async storeGovActionProposalData(
govActionProposalRequests: GovActionProposalRequest[],
): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions worker-service/src/governance/services/vote.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ export class VoteService extends CommonService {
try {
const prefix = '\\x'; // prefix for each hot address
const addresses = [...mapHotAddresses.keys()].map((key) => prefix + key);
this.logger.debug(`Addresses for fetching from db sync: ${addresses}`);
const dbData = await this.getDataFromSqlFile(
SQL_FILE_PATH.GET_VOTES,
addresses,
Expand Down

0 comments on commit dec6507

Please sign in to comment.