diff --git a/packages/common/src/scorekeeper/RegisterHandler.ts b/packages/common/src/scorekeeper/RegisterHandler.ts index 4616bff49..bb4ed9ca5 100644 --- a/packages/common/src/scorekeeper/RegisterHandler.ts +++ b/packages/common/src/scorekeeper/RegisterHandler.ts @@ -3,17 +3,8 @@ * * @function RegisterHandler */ -import { - ApiHandler, - ChainData, - Config, - logger, - queries, - ScoreKeeper, -} from "../index"; +import { ApiHandler, ChainData, Config, logger, queries } from "../index"; import { scorekeeperLabel } from "./scorekeeper"; -import { jobStatusEmitter } from "../Events"; -import { Job, JobStatus } from "./jobs/JobsClass"; export const registerAPIHandler = ( handler: ApiHandler, @@ -52,52 +43,3 @@ export const registerAPIHandler = ( logger.info(`New Session Event: ${sessionIndex}`, scorekeeperLabel); }); }; - -export const registerEventEmitterHandler = (scoreKeeper: ScoreKeeper) => { - logger.info(`Registering event emitter handler`, scorekeeperLabel); - jobStatusEmitter.on("jobStarted", (data) => { - // scoreKeeper.updateJobStarted(data); - }); - - jobStatusEmitter.on("jobRunning", (data) => { - // scoreKeeper.updateJobRunning(data); - }); - - jobStatusEmitter.on("jobFinished", (data) => { - // scoreKeeper.updateJobFinished(data); - }); - - jobStatusEmitter.on("jobErrored", (data) => { - // scoreKeeper.updateJobErrored(data); - }); - - jobStatusEmitter.on("jobProgress", (data) => { - // scoreKeeper.updateJobProgress(data); - }); -}; - -export const registerJobStatusEventEmitterHandler = (job: Job) => { - logger.info( - `Registering event emitter handler for job: ${job.getName()}`, - scorekeeperLabel, - ); - jobStatusEmitter.on("jobStarted", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobRunning", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobFinished", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobErrored", (data: JobStatus) => { - job.updateJobStatus(data); - }); - - jobStatusEmitter.on("jobProgress", (data: JobStatus) => { - job.updateJobStatus(data); - }); -}; diff --git a/packages/common/src/scorekeeper/jobs/Job.ts b/packages/common/src/scorekeeper/jobs/Job.ts new file mode 100644 index 000000000..85481029c --- /dev/null +++ b/packages/common/src/scorekeeper/jobs/Job.ts @@ -0,0 +1,77 @@ +import { startJob } from "./cron/StartCronJobs"; +import logger from "../../logger"; +import { jobStatusEmitter } from "../../Events"; +import { JobStatus, JobConfig, JobRunnerMetadata } from "./types"; + +export class Job { + protected status: JobStatus; + protected jobConfig: JobConfig; + protected jobRunnerMetadata: JobRunnerMetadata; + + // TODO: remove this dependency injection + private startJobFunction: ( + metadata: JobRunnerMetadata, + jobConfig: JobConfig, + ) => Promise; + + // TODO: remove events and use db to handle the state + // then we can decouple scorekeeper and gateway + static events: string[] = [ + "jobStarted", + "jobRunning", + "jobFinished", + "jobErrored", + "jobProgress", + ]; + + constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { + this.status = { + name: jobConfig.name, + updated: Date.now(), + status: "Not Running", + }; + this.jobConfig = jobConfig; + this.jobRunnerMetadata = jobRunnerMetadata; + this.startJobFunction = startJob; + } + + private log(message: string) { + logger.info(message, { label: "Job" }); + } + + // TODO: remove events and use db to handle the state + // then we can decouple scorekeeper and gateway + private registerEventHandlers() { + this.log(`Registering event handlers for ${this.jobConfig.name}`); + Job.events.forEach((event) => { + jobStatusEmitter.on(event, (data: JobStatus) => { + this.updateJobStatus(data); + }); + }); + } + + public async run(): Promise { + this.registerEventHandlers(); + this.log(`Starting ${this.getName()}`); + await this.startJobFunction(this.jobRunnerMetadata, this.jobConfig); + } + + // TODO: remove this public interface after decoupling with the Gateway + public getName(): string { + return this.jobConfig.name; + } + + public updateJobStatus(status: JobStatus) { + if (status.name == this.getName()) { + this.status = { ...this.status, ...status }; + } + } + + public getStatusAsJson(): string { + return JSON.stringify(this.status); + } + + public getStatus(): JobStatus { + return this.status; + } +} diff --git a/packages/common/src/scorekeeper/jobs/JobConfigs.ts b/packages/common/src/scorekeeper/jobs/JobConfigs.ts index 994fb0eab..c9791687e 100644 --- a/packages/common/src/scorekeeper/jobs/JobConfigs.ts +++ b/packages/common/src/scorekeeper/jobs/JobConfigs.ts @@ -1,5 +1,4 @@ -import { JobConfig, JobRunnerMetadata, jobsLabel } from "./JobsClass"; -import { Constants } from "../../index"; +import * as Constants from "../../constants"; import { activeValidatorJobWithTiming, blockJobWithTiming, @@ -16,12 +15,14 @@ import { validityJobWithTiming, } from "./specificJobs"; import { mainScorekeeperJob } from "./specificJobs/MainScorekeeperJob"; -import logger from "../../logger"; import { executionJob } from "./specificJobs/ExecutionJob"; import { cancelJob } from "./specificJobs/CancelJob"; import { staleNominationJob } from "./specificJobs/StaleNomination"; import { clearOfflineJob } from "./specificJobs/ClearOfflineJob"; +import { JobConfig } from "./types"; +// TODO: remove this enum, generate name based on the jobKey instead, +// store job name as a Job Class attribute export enum JobNames { ActiveValidator = "ActiveValidatorJob", Monitor = "MonitorJob", @@ -43,215 +44,113 @@ export enum JobNames { StaleNomination = "StaleNominationJob", } -export const getJobConfigs = ( - jobRunnerMetadata: JobRunnerMetadata, -): JobConfig[] => { - try { - logger.info(`getting job configs for each job`, jobsLabel); - - const activeValdiatorJobConfig: JobConfig = { - jobKey: "activeValidator", - defaultFrequency: Constants.ACTIVE_VALIDATOR_CRON, - jobFunction: async () => { - await activeValidatorJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.ActiveValidator, - preventOverlap: true, - }; - - const monitorJobConfig: JobConfig = { - jobKey: "monitor", - defaultFrequency: Constants.MONITOR_CRON, - jobFunction: async () => { - await getLatestTaggedRelease(); - }, - name: JobNames.Monitor, - preventOverlap: true, - }; - - const clearOfflineJobConfig: JobConfig = { - jobKey: "clearOffline", - defaultFrequency: Constants.CLEAR_OFFLINE_CRON, - jobFunction: async () => { - await clearOfflineJob(); - }, - name: JobNames.ClearOffline, - preventOverlap: true, - }; - - const validityJobConfig: JobConfig = { - jobKey: "validity", - defaultFrequency: Constants.VALIDITY_CRON, - jobFunction: async () => { - await validityJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Validity, - preventOverlap: true, - }; - - const scoreJobConfig: JobConfig = { - jobKey: "score", - defaultFrequency: Constants.SCORE_CRON, - jobFunction: async () => { - await scoreJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Score, - preventOverlap: true, - }; - - const eraStatsJobConfig: JobConfig = { - jobKey: "eraStats", - defaultFrequency: Constants.ERA_STATS_CRON, - jobFunction: async () => { - await eraStatsJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.EraStats, - preventOverlap: true, - }; - - const eraPointsJobConfig: JobConfig = { - jobKey: "eraPoints", - defaultFrequency: Constants.ERA_POINTS_CRON, - jobFunction: async () => { - await eraPointsJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.EraPoints, - preventOverlap: true, - }; - - const inclusionJobConfig: JobConfig = { - jobKey: "inclusion", - defaultFrequency: Constants.INCLUSION_CRON, - jobFunction: async () => { - await inclusionJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Inclusion, - preventOverlap: true, - }; - - const sessionKeyJobConfig: JobConfig = { - jobKey: "sessionKey", - defaultFrequency: Constants.SESSION_KEY_CRON, - jobFunction: async () => { - await sessionKeyJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.SessionKey, - preventOverlap: true, - }; - - const unclaimedEraJobConfig: JobConfig = { - jobKey: "unclaimedEras", - defaultFrequency: Constants.UNCLAIMED_ERAS_CRON, - jobFunction: async () => { - await unclaimedEraJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.UnclaimedEras, - preventOverlap: true, - }; - - const validatorPrefJobConfig: JobConfig = { - jobKey: "validatorPref", - defaultFrequency: Constants.VALIDATOR_PREF_CRON, - jobFunction: async () => { - await validatorPrefJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.ValidatorPref, - preventOverlap: true, - }; - - const locationStatsJobConfig: JobConfig = { - jobKey: "locationStats", - defaultFrequency: Constants.LOCATION_STATS_CRON, - jobFunction: async () => { - await locationStatsJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.LocationStats, - preventOverlap: true, - }; - - const nominatorJobConfig: JobConfig = { - jobKey: "nominator", - defaultFrequency: Constants.NOMINATOR_CRON, - jobFunction: async () => { - await nominatorJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.Nominator, - preventOverlap: true, - }; - - const blockDataJobConfig: JobConfig = { - jobKey: "block", - defaultFrequency: Constants.BLOCK_CRON, - jobFunction: async () => { - await blockJobWithTiming(jobRunnerMetadata); - }, - name: JobNames.BlockData, - preventOverlap: true, - }; - - const mainScorekeeperJobConfig: JobConfig = { - jobKey: "scorekeeper", - defaultFrequency: Constants.SCOREKEEPER_CRON, - jobFunction: async () => { - await mainScorekeeperJob(jobRunnerMetadata); - }, - name: JobNames.MainScorekeeper, - preventOverlap: true, - }; - - const executionJobConfig: JobConfig = { - jobKey: "execution", - defaultFrequency: Constants.EXECUTION_CRON, - jobFunction: async () => { - await executionJob(jobRunnerMetadata); - }, - name: JobNames.Execution, - preventOverlap: true, - }; - - const cancelJobConfig: JobConfig = { - jobKey: "cancel", - defaultFrequency: Constants.CANCEL_CRON, - jobFunction: async () => { - await cancelJob(jobRunnerMetadata); - }, - name: JobNames.Cancel, - preventOverlap: true, - }; - - const staleNominationJobConfig: JobConfig = { - jobKey: "stale", - defaultFrequency: Constants.STALE_CRON, - jobFunction: async () => { - await staleNominationJob(jobRunnerMetadata); - }, - name: JobNames.StaleNomination, - preventOverlap: true, - }; - - return [ - activeValdiatorJobConfig, - monitorJobConfig, - clearOfflineJobConfig, - validityJobConfig, - scoreJobConfig, - eraStatsJobConfig, - eraPointsJobConfig, - inclusionJobConfig, - sessionKeyJobConfig, - unclaimedEraJobConfig, - validatorPrefJobConfig, - locationStatsJobConfig, - nominatorJobConfig, - blockDataJobConfig, - mainScorekeeperJobConfig, - executionJobConfig, - cancelJobConfig, - staleNominationJobConfig, - ]; - } catch (e) { - logger.error(`Error getting job configs:`, jobsLabel); - logger.error(JSON.stringify(e)); - return []; - } -}; +export const jobConfigs: JobConfig[] = [ + { + jobKey: "activeValidator", + defaultFrequency: Constants.ACTIVE_VALIDATOR_CRON, + jobFunction: activeValidatorJobWithTiming, + name: JobNames.ActiveValidator, + }, + { + jobKey: "monitor", + defaultFrequency: Constants.MONITOR_CRON, + jobFunction: getLatestTaggedRelease, + name: JobNames.Monitor, + }, + { + jobKey: "clearOffline", + defaultFrequency: Constants.CLEAR_OFFLINE_CRON, + jobFunction: clearOfflineJob, + name: JobNames.ClearOffline, + }, + { + jobKey: "validity", + defaultFrequency: Constants.VALIDITY_CRON, + jobFunction: validityJobWithTiming, + name: JobNames.Validity, + }, + { + jobKey: "score", + defaultFrequency: Constants.SCORE_CRON, + jobFunction: scoreJobWithTiming, + name: JobNames.Score, + }, + { + jobKey: "eraStats", + defaultFrequency: Constants.ERA_STATS_CRON, + jobFunction: eraStatsJobWithTiming, + name: JobNames.EraStats, + }, + { + jobKey: "eraPoints", + defaultFrequency: Constants.ERA_POINTS_CRON, + jobFunction: eraPointsJobWithTiming, + name: JobNames.EraPoints, + }, + { + jobKey: "inclusion", + defaultFrequency: Constants.INCLUSION_CRON, + jobFunction: inclusionJobWithTiming, + name: JobNames.Inclusion, + }, + { + jobKey: "sessionKey", + defaultFrequency: Constants.SESSION_KEY_CRON, + jobFunction: sessionKeyJobWithTiming, + name: JobNames.SessionKey, + }, + { + jobKey: "unclaimedEras", + defaultFrequency: Constants.UNCLAIMED_ERAS_CRON, + jobFunction: unclaimedEraJobWithTiming, + name: JobNames.UnclaimedEras, + }, + { + jobKey: "validatorPref", + defaultFrequency: Constants.VALIDATOR_PREF_CRON, + jobFunction: validatorPrefJobWithTiming, + name: JobNames.ValidatorPref, + }, + { + jobKey: "locationStats", + defaultFrequency: Constants.LOCATION_STATS_CRON, + jobFunction: locationStatsJobWithTiming, + name: JobNames.LocationStats, + }, + { + jobKey: "nominator", + defaultFrequency: Constants.NOMINATOR_CRON, + jobFunction: nominatorJobWithTiming, + name: JobNames.Nominator, + }, + { + jobKey: "block", + defaultFrequency: Constants.BLOCK_CRON, + jobFunction: blockJobWithTiming, + name: JobNames.BlockData, + }, + { + jobKey: "scorekeeper", + defaultFrequency: Constants.SCOREKEEPER_CRON, + jobFunction: mainScorekeeperJob, + name: JobNames.MainScorekeeper, + }, + { + jobKey: "execution", + defaultFrequency: Constants.EXECUTION_CRON, + jobFunction: executionJob, + name: JobNames.Execution, + }, + { + jobKey: "cancel", + defaultFrequency: Constants.CANCEL_CRON, + jobFunction: cancelJob, + name: JobNames.Cancel, + }, + { + jobKey: "stale", + defaultFrequency: Constants.STALE_CRON, + jobFunction: staleNominationJob, + name: JobNames.StaleNomination, + }, +]; diff --git a/packages/common/src/scorekeeper/jobs/JobFactory.ts b/packages/common/src/scorekeeper/jobs/JobFactory.ts deleted file mode 100644 index 9b4c69e55..000000000 --- a/packages/common/src/scorekeeper/jobs/JobFactory.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { Job, JobConfig, JobRunnerMetadata, jobsLabel } from "./JobsClass"; -import { - ActiveValidatorJob, - BlockDataJob, - EraPointsJob, - EraStatsJob, - InclusionJob, - LocationStatsJob, - MonitorJob, - NominatorJob, - ScoreJob, - SessionKeyJob, - UnclaimedErasJob, - ValidatorPrefJob, - ValidityJob, -} from "./specificJobs"; -import { ClearOfflineJob } from "./specificJobs/ClearOfflineJob"; -import logger from "../../logger"; -import { MainScorekeeperJob } from "./specificJobs/MainScorekeeperJob"; -import { JobNames } from "./JobConfigs"; -import { ExecutionJob } from "./specificJobs/ExecutionJob"; -import { CancelJob } from "./specificJobs/CancelJob"; -import { StaleNominationJob } from "./specificJobs/StaleNomination"; - -export class JobFactory { - static makeJobs = async ( - jobConfigs: JobConfig[], - jobRunnerMetadata: JobRunnerMetadata, - ): Promise => { - try { - const jobs: Job[] = []; - for (const jobConfig of jobConfigs) { - switch (jobConfig.name) { - case JobNames.ActiveValidator: - jobs.push(new ActiveValidatorJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Monitor: - jobs.push(new MonitorJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.ClearOffline: - jobs.push(new ClearOfflineJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Score: - jobs.push(new ScoreJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Validity: - jobs.push(new ValidityJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.EraStats: - jobs.push(new EraStatsJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.EraPoints: - jobs.push(new EraPointsJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.LocationStats: - jobs.push(new LocationStatsJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.UnclaimedEras: - jobs.push(new UnclaimedErasJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Inclusion: - jobs.push(new InclusionJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.SessionKey: - jobs.push(new SessionKeyJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.ValidatorPref: - jobs.push(new ValidatorPrefJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Nominator: - jobs.push(new NominatorJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.BlockData: - jobs.push(new BlockDataJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.MainScorekeeper: - jobs.push(new MainScorekeeperJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Execution: - jobs.push(new ExecutionJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.Cancel: - jobs.push(new CancelJob(jobConfig, jobRunnerMetadata)); - break; - case JobNames.StaleNomination: - jobs.push(new StaleNominationJob(jobConfig, jobRunnerMetadata)); - break; - default: - logger.error(`Job not found: ${jobConfig.name}`, jobsLabel); - break; - } - } - return jobs; - } catch (e) { - logger.error(`Error making jobs: ${e}`, jobsLabel); - logger.error(JSON.stringify(e), jobsLabel); - return []; - } - }; -} diff --git a/packages/common/src/scorekeeper/jobs/JobRunner.ts b/packages/common/src/scorekeeper/jobs/JobRunner.ts deleted file mode 100644 index 4701cdf22..000000000 --- a/packages/common/src/scorekeeper/jobs/JobRunner.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { logger } from "../../index"; -import { scorekeeperLabel } from "../scorekeeper"; -import { Job, JobRunnerMetadata, jobsLabel } from "./JobsClass"; -import { JobFactory } from "./JobFactory"; -import { getJobConfigs } from "./JobConfigs"; - -export abstract class JobsRunner { - constructor(protected readonly metadata: JobRunnerMetadata) {} - - abstract _startSpecificJobs(): Promise; - - public startJobs = async (): Promise => { - try { - return await this._startSpecificJobs(); - } catch (e) { - logger.warn(`There was an error running some cron jobs...`, jobsLabel); - logger.error(e); - return []; - } - }; -} - -export const startMonolithJobs = async ( - metadata: JobRunnerMetadata, -): Promise => { - try { - const jobs = await JobFactory.makeJobs(getJobConfigs(metadata), metadata); - for (const job of jobs) { - await job.setupAndStartJob(); - } - return jobs; - } catch (e) { - logger.error(JSON.stringify(e), scorekeeperLabel); - logger.error("Error starting monolith jobs", scorekeeperLabel); - return []; - } -}; diff --git a/packages/common/src/scorekeeper/jobs/JobsClass.ts b/packages/common/src/scorekeeper/jobs/JobsClass.ts index 0ad2a90d9..957e7e8a8 100644 --- a/packages/common/src/scorekeeper/jobs/JobsClass.ts +++ b/packages/common/src/scorekeeper/jobs/JobsClass.ts @@ -1,84 +1,3 @@ -import { ApiHandler, ChainData, Config, Constraints } from "../../index"; -import MatrixBot from "../../matrix"; -import Nominator from "../../nominator/nominator"; -import { ConfigSchema } from "../../config"; -import { startJob } from "./cron/StartCronJobs"; -import logger from "../../logger"; -import { registerJobStatusEventEmitterHandler } from "../RegisterHandler"; - -export const jobsLabel = { label: "Jobs" }; - -export type JobRunnerMetadata = { - config: Config.ConfigSchema; - chaindata: ChainData; - nominatorGroups: Nominator[]; - nominating: boolean; - // currentEra: number; - bot: MatrixBot; - constraints: Constraints.OTV; - handler: ApiHandler; - currentTargets: { stash?: string; identity?: any }[]; -}; - -export type JobConfig = { - jobKey: keyof ConfigSchema["cron"] | ""; - defaultFrequency: string; - jobFunction: (metadata: JobRunnerMetadata) => Promise; - name: string; - preventOverlap?: boolean; -}; - -export interface JobStatus { - name: string; - updated: number; - enabled?: boolean; - runCount?: number; - status: string; - frequency?: string; - error?: string; - progress?: number; // Progress from 0 to 100 - iteration?: string; // Name of the current iteration -} - -export abstract class Job { - protected _status: JobStatus; - protected _jobConfig: JobConfig; - protected _jobRunnerMetadata: JobRunnerMetadata; - - constructor(jobConfig: JobConfig, jobRunnerMetadata: JobRunnerMetadata) { - this._status = { - name: jobConfig.name, - updated: Date.now(), - status: "Not Running", - }; - this._jobConfig = jobConfig; - this._jobRunnerMetadata = jobRunnerMetadata; - } - - public setupAndStartJob = async (): Promise => { - logger.info( - `Registering Event Emitter for ${this._jobConfig.name}`, - jobsLabel, - ); - registerJobStatusEventEmitterHandler(this); - logger.info(`Starting ${this._jobConfig.name}`, jobsLabel); - await startJob(this._jobRunnerMetadata, this._jobConfig); - }; - - public getName = (): string => { - return this._jobConfig.name; - }; - - public updateJobStatus(status: JobStatus) { - if (status.name == this._jobConfig.name) { - this._status = { ...this._status, ...status }; - } - } - - public getStatusAsJson(): string { - return JSON.stringify(this._status); - } - public getStatus = (): JobStatus => { - return this._status; - }; -} +export type { JobConfig, JobRunnerMetadata, JobStatus } from "./types"; +export { Job } from "./Job"; +// TODO: remove this file during the next refactoring step diff --git a/packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts b/packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts deleted file mode 100644 index 52912d8af..000000000 --- a/packages/common/src/scorekeeper/jobs/JobsRunnerFactory.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { MicroserviceJobRunner } from "./MicroserviceJobRunner"; -import { MonolithJobRunner } from "./MonolithJobRunner"; -import { JobsRunner } from "./JobRunner"; -import { JobRunnerMetadata } from "./JobsClass"; - -export class JobsRunnerFactory { - static makeJobs = async ( - metadata: JobRunnerMetadata, - ): Promise => { - if (!metadata.config?.redis?.host && metadata.config?.redis?.port) - return new MicroserviceJobRunner(metadata); - else return new MonolithJobRunner(metadata); - }; -} diff --git a/packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts b/packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts deleted file mode 100644 index 72502b537..000000000 --- a/packages/common/src/scorekeeper/jobs/MicroserviceJobRunner.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { logger } from "../..//index"; - -// import { otvWorker } from "@1kv/worker"; -import { scorekeeperLabel } from "../scorekeeper"; -import { JobsRunner } from "./JobRunner"; -import { Job } from "./JobsClass"; - -export class MicroserviceJobRunner extends JobsRunner { - _startSpecificJobs = async (): Promise => { - const { config, chaindata } = this.metadata; - if (!config?.redis?.host || !config?.redis?.port) { - logger.error( - `No redis config found. Microservice Jobs will not be started.`, - scorekeeperLabel, - ); - return []; - } - try { - // Jobs get run in separate worker - logger.info(`Starting bullmq Queues and Workers....`, scorekeeperLabel); - // const releaseMonitorQueue = - // await otvWorker.queues.createReleaseMonitorQueue( - // config.redis.host, - // config.redis.port, - // ); - // const constraintsQueue = await otvWorker.queues.createConstraintsQueue( - // config.redis.host, - // config.redis.port, - // ); - // const chaindataQueue = await otvWorker.queues.createChainDataQueue( - // config.redis.host, - // config.redis.port, - // ); - // const blockQueue = await otvWorker.queues.createBlockQueue( - // config.redis.host, - // config.redis.port, - // ); - // - // const removeRepeatableJobs = true; - // if (removeRepeatableJobs) { - // logger.info(`remove jobs: ${removeRepeatableJobs}`, scorekeeperLabel); - // // Remove any previous repeatable jobs - // await otvWorker.queues.removeRepeatableJobsFromQueues([ - // releaseMonitorQueue, - // constraintsQueue, - // chaindataQueue, - // blockQueue, - // ]); - // } - // - // const obliterateQueues = false; - // if (obliterateQueues) { - // await otvWorker.queues.obliterateQueues([ - // releaseMonitorQueue, - // constraintsQueue, - // chaindataQueue, - // blockQueue, - // ]); - // } - // - // // Add repeatable jobs to the queues - // // Queues need to have different repeat time intervals - // await otvWorker.queues.addReleaseMonitorJob(releaseMonitorQueue, 60000); - // await otvWorker.queues.addValidityJob(constraintsQueue, 1000001); - // await otvWorker.queues.addScoreJob(constraintsQueue, 100002); - // await otvWorker.queues.addActiveValidatorJob(chaindataQueue, 100003); - // await otvWorker.queues.addEraPointsJob(chaindataQueue, 100006); - // await otvWorker.queues.addEraStatsJob(chaindataQueue, 110008); - // await otvWorker.queues.addInclusionJob(chaindataQueue, 100008); - // await otvWorker.queues.addNominatorJob(chaindataQueue, 100009); - // await otvWorker.queues.addSessionKeyJob(chaindataQueue, 100010); - // await otvWorker.queues.addValidatorPrefJob(chaindataQueue, 100101); - // await otvWorker.queues.addAllBlocks(blockQueue, chaindata); - // TODO update this as queue job - // await startLocationStatsJob(this.config, this.chaindata); - return []; - } catch (e) { - logger.error(JSON.stringify(e), scorekeeperLabel); - logger.error("Error starting microservice jobs", scorekeeperLabel); - return []; - } - }; -} diff --git a/packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts b/packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts deleted file mode 100644 index 61239a9b0..000000000 --- a/packages/common/src/scorekeeper/jobs/MonolithJobRunner.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { JobsRunner, startMonolithJobs } from "./JobRunner"; -import { Job } from "./JobsClass"; - -export class MonolithJobRunner extends JobsRunner { - _startSpecificJobs = async (): Promise => { - return await startMonolithJobs(this.metadata); - }; -} diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts index 92bf00c61..baad30eb2 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ClearOfflineJob.ts @@ -9,7 +9,9 @@ export class ClearOfflineJob extends Job { } } -export const clearOfflineJob = async () => { +export const clearOfflineJob = async ( + jobRunnerMetadata?: JobRunnerMetadata, +) => { jobStatusEmitter.emit("jobProgress", { name: JobNames.ClearOffline, progress: 0, diff --git a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts index 9167c7f42..7eb0db0c2 100644 --- a/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts +++ b/packages/common/src/scorekeeper/jobs/specificJobs/ReleaseMonitorJob.ts @@ -12,7 +12,9 @@ export class MonitorJob extends Job { } } -export const getLatestTaggedRelease = async () => { +export const getLatestTaggedRelease = async ( + jobRunnerMetadata?: JobRunnerMetadata, +) => { try { const start = Date.now(); diff --git a/packages/common/src/scorekeeper/jobs/types.ts b/packages/common/src/scorekeeper/jobs/types.ts new file mode 100644 index 000000000..626a383cc --- /dev/null +++ b/packages/common/src/scorekeeper/jobs/types.ts @@ -0,0 +1,44 @@ +import { ApiHandler, ChainData, Config, Constraints } from "../../index"; +import MatrixBot from "../../matrix"; +import Nominator from "../../nominator/nominator"; + +export type JobRunnerMetadata = { + config: Config.ConfigSchema; + chaindata: ChainData; + nominatorGroups: Nominator[]; + nominating: boolean; + bot: MatrixBot; + constraints: Constraints.OTV; + handler: ApiHandler; + currentTargets: { stash?: string; identity?: any }[]; +}; + +export type JobConfig = { + jobKey: keyof Config.ConfigSchema["cron"] | ""; + defaultFrequency: string; + jobFunction: (metadata: JobRunnerMetadata) => Promise; + name: string; + preventOverlap?: boolean; +}; + +// There is a dependency on status names in scorekeeper-status-ui +type StatusName = + | "running" + | "finished" + | "errored" + | "started" + | "Not Running"; + +export type JobStatus = { + name: string; + updated: number; + enabled?: boolean; + runCount?: number; + status: StatusName; + frequency?: string; + error?: string; + // Progress from 0 to 100 + progress?: number; + // Name of the current iteration + iteration?: string; +}; diff --git a/packages/common/src/scorekeeper/scorekeeper.ts b/packages/common/src/scorekeeper/scorekeeper.ts index e0f7eeb6f..67aa83a59 100644 --- a/packages/common/src/scorekeeper/scorekeeper.ts +++ b/packages/common/src/scorekeeper/scorekeeper.ts @@ -10,16 +10,13 @@ import { } from "../index"; import Nominator from "../nominator/nominator"; -import { - registerAPIHandler, - registerEventEmitterHandler, -} from "./RegisterHandler"; -import { Job, JobRunnerMetadata, JobStatus } from "./jobs/JobsClass"; -import { JobsRunnerFactory } from "./jobs/JobsRunnerFactory"; +import { registerAPIHandler } from "./RegisterHandler"; +import { Job } from "./jobs/Job"; +import { JobRunnerMetadata, JobStatus } from "./jobs/types"; +import { jobConfigs } from "./jobs/JobConfigs"; import { startRound } from "./Round"; import { NominatorStatus } from "../types"; import { setAllIdentities } from "../utils"; -// import { monitorJob } from "./jobs"; export type NominatorGroup = Config.NominatorConfig[]; @@ -63,7 +60,6 @@ export default class ScoreKeeper { this.upSince = Date.now(); registerAPIHandler(this.handler, this.config, this.chaindata, this.bot); - registerEventEmitterHandler(this); } public getJobsStatusAsJson() { const statuses: Record = {}; @@ -74,25 +70,13 @@ export default class ScoreKeeper { } getAllNominatorBondedAddresses(): string[] { - const bondedAddresses = []; - const nomGroup = this.nominatorGroups; - if (nomGroup) { - for (const nom of nomGroup) { - bondedAddresses.push(nom?.bondedAddress); - } - - return bondedAddresses; - } else { - return []; - } + return this.nominatorGroups + ? this.nominatorGroups.map((nom) => nom?.bondedAddress) + : []; } getAllNominatorStatus(): NominatorStatus[] { - const statuses = []; - for (const nom of this.nominatorGroups) { - statuses.push(nom.getStatus()); - } - return statuses; + return this.nominatorGroups.map((nom) => nom.getStatus()); } getAllNominatorStatusJson() { @@ -235,6 +219,14 @@ export default class ScoreKeeper { return true; } + startJobs(metadata: JobRunnerMetadata) { + this._jobs = jobConfigs.map((config) => { + const job = new Job(config, metadata); + job.run(); + return job; + }); + } + // Begin the main workflow of the scorekeeper async begin(): Promise { try { @@ -294,9 +286,7 @@ export default class ScoreKeeper { currentTargets: this.currentTargets, }; - const jobRunner = await JobsRunnerFactory.makeJobs(metadata); - - this._jobs = await jobRunner.startJobs(); + this.startJobs(metadata); this.isStarted = true; return true; } catch (e) { diff --git a/packages/common/test/scorekeeper/scorekeeper.int.test.ts b/packages/common/test/scorekeeper/scorekeeper.int.test.ts index 787758e6e..f3858d0d9 100644 --- a/packages/common/test/scorekeeper/scorekeeper.int.test.ts +++ b/packages/common/test/scorekeeper/scorekeeper.int.test.ts @@ -1,6 +1,7 @@ import { beforeAll, describe, expect, it } from "vitest"; import { ScoreKeeper } from "../../src"; import { getAndStartScorekeeper } from "../testUtils/scorekeeper"; +import { jobStatusEmitter } from "../../src/Events"; const TIMEOUT_DURATION = 5200000; // 120 seconds describe("Scorekeeper Integration Tests", () => { @@ -24,16 +25,41 @@ describe("Scorekeeper Integration Tests", () => { it( "should start jobs and have their status as started", async () => { - const status = scorekeeper.getJobsStatusAsJson(); - for (const key in status) { - if (status.hasOwnProperty(key)) { - const job = status[key]; - if (job.status !== "started") { - console.error(`Job ${job.name} is not started.`); - } - expect(job.status).toBeDefined(); + const statusJson = scorekeeper.getJobsStatusAsJson(); + const statuses = Object.values(statusJson); + statuses.forEach((job) => { + if (job.status !== "started") { + console.error(`Job ${job.name} is not started.`); } - } + expect(job.status).toBeDefined(); + }); + }, + TIMEOUT_DURATION, + ); + + it( + "should update status of the job on event", + async () => { + const statusesBefore = scorekeeper.getJobsStatusAsJson(); + const firstJob = Object.values(statusesBefore)[0]; + const secondJob = Object.values(statusesBefore)[1]; + + jobStatusEmitter.emit("jobRunning", { + name: firstJob.name, + status: "running", + updated: Date.now(), + }); + + await new Promise((resolve) => setTimeout(resolve, 500)); + + const statusesAfter = scorekeeper.getJobsStatusAsJson(); + const updatedFirstJob = Object.values(statusesAfter)[0]; + const unchangedSecondJob = Object.values(statusesAfter)[1]; + + // Check that the first job's status has been updated + expect(updatedFirstJob.status).toBe("running"); + // Check that the second job's status remains unchanged + expect(unchangedSecondJob.status).toBe(secondJob.status); }, TIMEOUT_DURATION, );