diff --git a/package-lock.json b/package-lock.json index 5e5dd0b..d84dcef 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "@types/shortid": "0.0.31", "@types/sshpk": "^1.17.3", "bee-queue": "^1.6.0", + "consul": "^2.0.1", "dotenv": "^16.3.1", "dots-wrapper": "^3.11.3", "envalid": "^8.0.0", @@ -1555,6 +1556,29 @@ "url": "https://github.com/open-cli-tools/concurrently?sponsor=1" } }, + "node_modules/consul": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/consul/-/consul-2.0.1.tgz", + "integrity": "sha512-91ExUUelOJ1yyB0etYAR0w1p6Ues1VosEyBVxPcWJdnQDTKqAEFzL0MHfOqZWYI2d4HZ4FgotHZkAPW2A/xahA==", + "license": "MIT", + "dependencies": { + "papi": "^1.1.0", + "uuid": "^10.0.0" + } + }, + "node_modules/consul/node_modules/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/content-disposition": { "version": "0.5.4", "resolved": "https://registry.npmjs.org/content-disposition/-/content-disposition-0.5.4.tgz", @@ -4769,6 +4793,15 @@ "node": ">=8" } }, + "node_modules/papi": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/papi/-/papi-1.1.2.tgz", + "integrity": "sha512-cwM6pPpfAYgPe3EQi23SmB5J5s4XFS9lou9z63I5BbnMGmFaR8LAKvKboW7n1IUAKj76OtnyK0YU16JjnZrqVg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -7431,6 +7464,22 @@ "yargs": "^17.7.2" } }, + "consul": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/consul/-/consul-2.0.1.tgz", + "integrity": "sha512-91ExUUelOJ1yyB0etYAR0w1p6Ues1VosEyBVxPcWJdnQDTKqAEFzL0MHfOqZWYI2d4HZ4FgotHZkAPW2A/xahA==", + "requires": { + "papi": "^1.1.0", + "uuid": "^10.0.0" + }, + "dependencies": { + "uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==" + } + } + }, "content-disposition": { "version": "0.5.4", "resolved": "https://registry.npmjs.org/content-disposition/-/content-disposition-0.5.4.tgz", @@ -9977,6 +10026,11 @@ "resolved": "https://registry.npmjs.org/p-finally/-/p-finally-2.0.1.tgz", "integrity": "sha512-vpm09aKwq6H9phqRQzecoDpD8TmVyGw70qmWlyq5onxY7tqyTTFVvxMykxQSQKILBSFlbXpypIw2T1Ml7+DDtw==" }, + "papi": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/papi/-/papi-1.1.2.tgz", + "integrity": "sha512-cwM6pPpfAYgPe3EQi23SmB5J5s4XFS9lou9z63I5BbnMGmFaR8LAKvKboW7n1IUAKj76OtnyK0YU16JjnZrqVg==" + }, "parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", diff --git a/package.json b/package.json index 4c8e1cb..ad116e0 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "@types/shortid": "0.0.31", "@types/sshpk": "^1.17.3", "bee-queue": "^1.6.0", + "consul": "^2.0.1", "dotenv": "^16.3.1", "dots-wrapper": "^3.11.3", "envalid": "^8.0.0", diff --git a/src/app.ts b/src/app.ts index 085d0ea..559bef2 100644 --- a/src/app.ts +++ b/src/app.ts @@ -447,7 +447,7 @@ app.put( body('maxDesired').optional().isInt({ min: 0 }).withMessage('Value must be positive'), body('desiredCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'), body().custom(async (value, { req }) => { - if (!(await validator.groupHasValidDesiredInput(req.params.name, value))) { + if (!(await validator.groupHasValidDesiredInput(req.context, req.params.name, value))) { throw new Error('Desired count must be between min and max; min cannot be grater than max'); } return true; diff --git a/src/autoscaler.ts b/src/autoscaler.ts index d621913..c6e9c8d 100644 --- a/src/autoscaler.ts +++ b/src/autoscaler.ts @@ -46,7 +46,7 @@ export default class AutoscaleProcessor { } try { - const group = await this.instanceGroupManager.getInstanceGroup(groupName); + const group = await this.instanceGroupManager.getInstanceGroup(ctx, groupName); if (!group) { throw new Error(`Group ${groupName} not found, failed to process autoscaling`); } diff --git a/src/cloud_manager.ts b/src/cloud_manager.ts index ca520cd..97b5f54 100644 --- a/src/cloud_manager.ts +++ b/src/cloud_manager.ts @@ -64,7 +64,12 @@ export default class CloudManager { }; await this.instanceTracker.track(ctx, state); if (isScaleDownProtected) { - await this.shutdownManager.setScaleDownProtected(ctx, instanceId, group.protectedTTLSec); + await this.shutdownManager.setScaleDownProtected( + ctx, + group.name, + instanceId, + group.protectedTTLSec, + ); ctx.logger.info( `[CloudManager] Instance ${instanceId} from group ${group.name} is in protected mode`, ); diff --git a/src/consul.ts b/src/consul.ts new file mode 100644 index 0000000..c9f783e --- /dev/null +++ b/src/consul.ts @@ -0,0 +1,398 @@ +import Consul from 'consul'; +import { Context } from './context'; +import { GetItem } from 'consul/lib/kv'; +import InstanceStore, { InstanceDetails, InstanceGroup, InstanceState } from './instance_store'; +import { CloudInstance } from './cloud_manager'; + +// implments the InstanceStore interface using consul K/V API calls +// uses the got library to make HTTP requests + +export interface ConsulOptions { + host: string; + port: number; + secure: boolean; + groupsPrefix?: string; + valuesPrefix?: string; + instancesPrefix?: string; + client?: Consul; +} + +interface TTLValue { + expires: number; + status: string; +} + +interface TTLValueMap { + [key: string]: TTLValue; +} + +export default class ConsulStore implements InstanceStore { + private client: Consul; + private groupsPrefix = 'autoscaler/groups/'; + private valuesPrefix = 'autoscaler/values/'; + private instancesPrefix = 'autoscaler/instances/'; + + constructor(options: ConsulOptions) { + if (options.client) { + this.client = options.client; + } else { + this.client = new Consul(options); + } + if (options.groupsPrefix) { + this.groupsPrefix = options.groupsPrefix; + } + if (options.valuesPrefix) { + this.valuesPrefix = options.valuesPrefix; + } + if (options.instancesPrefix) { + this.instancesPrefix = options.instancesPrefix; + } + } + + // shutdown related methods + async setShutdownStatus( + ctx: Context, + instanceDetails: InstanceDetails[], + status: string, + ttl: number, + ): Promise { + const p: Promise[] = []; + for (const instance of instanceDetails) { + ctx.logger.debug(`setting shutdown status for instance`, { instance, status }); + p.push( + this.writeTTLValue( + ctx, + `${this.groupsPrefix}${instance.group}/shutdown/${instance.instanceId}`, + status, + ttl, + ), + ); + } + + return (await Promise.allSettled(p)) + .map((r) => r.status === 'fulfilled' && r.value === true) + .reduce((a, b) => a && b, true); + } + + async fetchShutdownStatus(ctx: Context, group: string, clean = true): Promise { + return this.fetchRecursiveTTLValues(ctx, `${this.groupsPrefix}${group}/shutdown`, clean); + } + + async getShutdownStatuses(ctx: Context, group: string, instanceIds: string[]): Promise { + const groupShutdownInstanceIds = Object.keys(await this.fetchShutdownStatus(ctx, group)); + return instanceIds.map((instanceId) => groupShutdownInstanceIds.includes(instanceId)); + } + + async fetchShutdownConfirmations(ctx: Context, group: string): Promise { + return this.fetchRecursiveTTLValues(ctx, `${this.groupsPrefix}${group}/confirmation`); + } + + async getShutdownConfirmations(ctx: Context, group: string, instanceIds: string[]): Promise<(string | false)[]> { + const groupShutdownConfirmations = await this.fetchShutdownConfirmations(ctx, group); + return instanceIds.map((instanceId) => { + const confirmation = groupShutdownConfirmations[instanceId]; + if (confirmation) { + return confirmation.status; + } else { + return false; + } + }); + } + + async getShutdownStatus(ctx: Context, group: string, instanceId: string): Promise { + const v = await this.fetchTTLValue(ctx, `${this.groupsPrefix}${group}/shutdown/${instanceId}`); + return v !== undefined; + } + + async getShutdownConfirmation(ctx: Context, group: string, instanceId: string): Promise { + const v = await this.fetchTTLValue(ctx, `${this.groupsPrefix}${group}/confirmation/${instanceId}`); + if (v) { + return v.status; + } else { + return false; + } + } + + async setShutdownConfirmation( + ctx: Context, + instanceDetails: InstanceDetails[], + status: string, + ttl: number, + ): Promise { + const p: Promise[] = []; + for (const instance of instanceDetails) { + ctx.logger.debug(`setting shutdown confirmation for instance`, { instance, status }); + p.push( + this.writeTTLValue( + ctx, + `${this.groupsPrefix}${instance.group}/confirmation/${instance.instanceId}`, + status, + ttl, + ), + ); + } + + return (await Promise.allSettled(p)) + .map((r) => r.status === 'fulfilled' && r.value === true) + .reduce((a, b) => a && b, true); + } + + async setScaleDownProtected( + ctx: Context, + group: string, + instanceId: string, + protectedTTL: number, + mode: string, + ): Promise { + return this.writeTTLValue(ctx, `${this.groupsPrefix}${group}/protected/${instanceId}`, mode, protectedTTL); + } + + async areScaleDownProtected(ctx: Context, group: string, instanceIds: string[]): Promise { + const res = await this.fetchRecursiveTTLValues(ctx, `${this.groupsPrefix}${group}/protected`); + const scaleProtectedInstances = Object.keys(res); + + return instanceIds.map((instanceId) => scaleProtectedInstances.includes(instanceId)); + } + + // reconfigure related methods + async setReconfigureDate( + ctx: Context, + instanceDetails: InstanceDetails[], + date: string, + ttl: number, + ): Promise { + const p = []>[]; + for (const instance of instanceDetails) { + p.push(this.writeTTLValue(ctx, `${this.instancesPrefix}/reconfigure/${instance.instanceId}`, date, ttl)); + } + + return (await Promise.allSettled(p)) + .map((r) => r.status === 'fulfilled' && r.value === true) + .reduce((a, b) => a && b, true); + } + + async unsetReconfigureDate(ctx: Context, instanceId: string, group: string): Promise { + return this.delete(`${this.groupsPrefix}${group}/reconfigure/${instanceId}`); + } + + async getReconfigureDates(ctx: Context, group: string, instanceIds: string[]): Promise { + const res = await this.fetchRecursiveTTLValues(ctx, `${this.groupsPrefix}${group}/reconfigure`); + return instanceIds.map((instanceId) => { + const reconfigure = res[`${this.groupsPrefix}${group}/reconfigure/${instanceId}`]; + if (reconfigure) { + return reconfigure.status; + } else { + return ''; + } + }); + } + async getReconfigureDate(ctx: Context, group: string, instanceId: string): Promise { + try { + const v = await this.fetch(ctx, `${this.groupsPrefix}${group}/reconfigure/${instanceId}`); + if (v) { + const reconfigure = JSON.parse(v.Value); + return reconfigure.status; + } else { + return ''; + } + } catch (err) { + ctx.logger.error(`Failed to get reconfigure date from consul: ${err}`, { err }); + throw err; + } + } + + async getInstanceGroup(ctx: Context, group: string): Promise { + try { + const v = await this.fetch(ctx, `${this.groupsPrefix}${group}`); + if (v) { + return JSON.parse(v.Value); + } else { + return undefined; + } + } catch (err) { + ctx.logger.error(`Failed to get instance group from consul: ${err}`, { err }); + throw err; + } + } + + async getAllInstanceGroupNames(ctx: Context): Promise { + const res = await this.fetchRecursive(ctx, this.groupsPrefix); + if (!res) { + return []; + } + return Object.entries(res).map(([_k, v]) => v.Key.replace(this.groupsPrefix, '')); + } + + async getAllInstanceGroups(ctx: Context): Promise { + ctx.logger.debug('fetching consul k/v keys'); + const res = await this.client.kv.get({ key: this.groupsPrefix, recurse: true }); + ctx.logger.debug('received consul k/v keys', { res }); + if (!res) { + return []; + } + return Object.entries(res).map(([_k, v]) => JSON.parse(v.Value)); + } + + async upsertInstanceGroup(ctx: Context, group: InstanceGroup): Promise { + try { + await this.write(ctx, `${this.groupsPrefix}${group.name}`, JSON.stringify(group)); + return true; + } catch (err) { + ctx.logger.error(`Failed to upsert instance group into consul: ${err}`, { group: group.name, err }); + return false; + } + } + + async deleteInstanceGroup(ctx: Context, group: string): Promise { + try { + await this.delete(`${this.groupsPrefix}${group}`); + return; + } catch (err) { + ctx.logger.error(`Failed to delete instance group from consul: ${err}`, { group, err }); + return; + } + } + + async fetchInstanceStates(ctx: Context, group: string): Promise { + try { + const states = await this.client.kv.get({ key: `${this.groupsPrefix}${group}/states`, recurse: true }); + return Object.entries(states).map(([_k, v]) => JSON.parse(v.Value)); + } catch (err) { + ctx.logger.error(`Failed to get instance states from consul: ${err}`, { err }); + throw err; + } + } + + // TODO: implement this method + async filterOutAndTrimExpiredStates( + _ctx: Context, + _group: string, + states: InstanceState[], + ): Promise { + return states; + } + + async saveInstanceStatus(ctx: Context, group: string, state: InstanceState): Promise { + try { + await this.write(ctx, `${this.groupsPrefix}${group}/states/${state.instanceId}`, JSON.stringify(state)); + return true; + } catch (err) { + ctx.logger.error(`Failed to save instance state into consul: ${err}`, { group, state, err }); + return false; + } + } + + async fetchRecursive(ctx: Context, key: string): Promise { + try { + const v = await this.client.kv.get({ key, recurse: true }); + if (!v) { + return []; + } + const obj = Object.entries(v).map(([_k, v]) => v); + return obj; + } catch (err) { + ctx.logger.error(`Failed to read ${key} from consul: ${err}`, { err, key }); + throw err; + // return []; + } + } + + async fetchRecursiveTTLValues(ctx: Context, key: string, clean = true): Promise { + const values = {}; + (await this.fetchRecursive(ctx, key)).map((v) => { + values[v.Key.replace(`${key}/`, '')] = JSON.parse(v.Value); + }); + if (clean) { + const p: Promise[] = []; + Object.entries(values).map(([k, v]) => { + if (v.expires <= Date.now()) { + p.push(this.delete(k)); + delete values[k]; + } + }); + (await Promise.allSettled(p)).map((r) => { + if (r.status === 'rejected') { + ctx.logger.error(`Failed to delete key from consul: ${r.reason}`, { key: r.reason }); + } + }); + } + + return values; + } + + async fetchTTLValue(ctx: Context, key: string): Promise { + const v = await this.fetch(ctx, key); + if (v) { + const ttlv = JSON.parse(v.Value); + if (ttlv.expires > Date.now()) { + return ttlv; + } else { + return undefined; + } + } + return undefined; + } + + async fetch(ctx: Context, key: string): Promise { + ctx.logger.debug(`reading consul k/v key`, { key }); + const v = await this.client.kv.get(key); + ctx.logger.debug(`received consul k/v item`, { v }); + return v; + } + + async write(ctx: Context, key: string, value: string): Promise { + try { + const res = await this.client.kv.set(key, value); + if (!res) { + ctx.logger.error(`Failed to write to consul`, { key, value }); + } + return res; + } catch (err) { + ctx.logger.error(`Failed to write to consul: ${err}`, { key, err }); + return false; + } + } + + async writeTTLValue(ctx: Context, key: string, status: string, ttl: number): Promise { + return this.write(ctx, key, JSON.stringify({ status, expires: Date.now() + ttl * 1000 })); + } + + // save alongside a ttl with the timestamp after which the value is considered expired + async setValue(ctx: Context, key: string, value: string, ttl: number): Promise { + return this.writeTTLValue(ctx, this.valuesPrefix + key, value, ttl); + } + + // the value is considered expired if the timestamp is in the past + async checkValue(ctx: Context, key: string): Promise { + try { + const res = this.fetchTTLValue(ctx, key); + if (!res) { + return false; + } + return true; + } catch (err) { + return false; + } + } + + // save cloud instances + async saveCloudInstances(ctx: Context, group: string, instances: CloudInstance[]): Promise { + try { + await this.write(ctx, `${this.groupsPrefix}${group}/instances`, JSON.stringify(instances)); + return true; + } catch (err) { + ctx.logger.error(`Failed to save cloud instances into consul: ${err}`, { group, instances, err }); + return false; + } + } + + async existsAtLeastOneGroup(ctx: Context): Promise { + const res = await this.getAllInstanceGroups(ctx); + return res && res.length > 0; + } + + async delete(key: string): Promise { + await this.client.kv.del(key); + return true; + } +} diff --git a/src/group_report.ts b/src/group_report.ts index 299e9f8..60c784f 100644 --- a/src/group_report.ts +++ b/src/group_report.ts @@ -9,6 +9,7 @@ import { InstanceGroup, InstanceState, JibriStatusState } from './instance_store export interface InstanceReport { instanceId: string; displayName?: string; + group?: string; instanceName?: string; scaleStatus?: string; cloudStatus?: string; @@ -110,10 +111,10 @@ export default class GroupReportGenerator { groupReport.instances.push(instanceReport); }); - await this.addShutdownStatus(ctx, groupReport.instances); - await this.addShutdownConfirmations(ctx, groupReport.instances); - await this.addReconfigureDate(ctx, groupReport.instances); - await this.addShutdownProtectedStatus(ctx, groupReport.instances); + await this.addShutdownStatus(ctx, group.name, groupReport.instances); + await this.addShutdownConfirmations(ctx, group.name, groupReport.instances); + await this.addReconfigureDate(ctx, group.name, groupReport.instances); + await this.addShutdownProtectedStatus(ctx, group.name, groupReport.instances); groupReport.instances.forEach((instanceReport) => { if (this.isProvisioningOrRunningCloudInstance(instanceReport)) { @@ -187,6 +188,7 @@ export default class GroupReportGenerator { instanceStates.forEach((instanceState) => { const instanceReport = { instanceId: instanceState.instanceId, + group: group.name, displayName: 'unknown', instanceName: 'unknown', scaleStatus: 'unknown', @@ -285,9 +287,10 @@ export default class GroupReportGenerator { return instanceReports; } - private async addReconfigureDate(ctx: Context, instanceReports: InstanceReport[]): Promise { + private async addReconfigureDate(ctx: Context, group: string, instanceReports: InstanceReport[]): Promise { const reconfigureDates = await this.reconfigureManager.getReconfigureDates( ctx, + group, instanceReports.map((instanceReport) => { return instanceReport.instanceId; }), @@ -298,9 +301,10 @@ export default class GroupReportGenerator { } } - private async addShutdownStatus(ctx: Context, instanceReports: InstanceReport[]): Promise { + private async addShutdownStatus(ctx: Context, group: string, instanceReports: InstanceReport[]): Promise { const shutdownStatuses = await this.shutdownManager.getShutdownStatuses( ctx, + group, instanceReports.map((instanceReport) => { return instanceReport.instanceId; }), @@ -316,10 +320,15 @@ export default class GroupReportGenerator { }); } - private async addShutdownConfirmations(ctx: Context, instanceReports: InstanceReport[]): Promise { + private async addShutdownConfirmations( + ctx: Context, + group: string, + instanceReports: InstanceReport[], + ): Promise { ( await this.shutdownManager.getShutdownConfirmations( ctx, + group, instanceReports.map((instanceReport) => { return instanceReport.instanceId; }), @@ -329,9 +338,14 @@ export default class GroupReportGenerator { }); } - private async addShutdownProtectedStatus(ctx: Context, instanceReports: InstanceReport[]): Promise { + private async addShutdownProtectedStatus( + ctx: Context, + group: string, + instanceReports: InstanceReport[], + ): Promise { const instanceReportsProtectedStatus: boolean[] = await this.shutdownManager.areScaleDownProtected( ctx, + group, instanceReports.map((instanceReport) => { return instanceReport.instanceId; }), diff --git a/src/handlers.ts b/src/handlers.ts index 20c7089..1a0e53b 100644 --- a/src/handlers.ts +++ b/src/handlers.ts @@ -135,8 +135,8 @@ class Handlers { statsCounter.inc(); try { const [shutdownStatus, reconfigureDate] = await Promise.all([ - this.shutdownManager.getShutdownStatus(req.context, details.instanceId), - this.reconfigureManager.getReconfigureDate(req.context, details.instanceId), + this.shutdownManager.getShutdownStatus(req.context, details.group, details.instanceId), + this.reconfigureManager.getReconfigureDate(req.context, details.group, details.instanceId), ]); const sendResponse: SidecarResponse = { @@ -181,8 +181,12 @@ class Handlers { statsCounter.inc(); try { const [shutdownStatus, reconfigureDate] = await Promise.all([ - this.shutdownManager.getShutdownStatus(req.context, report.instance.instanceId), - this.reconfigureManager.getReconfigureDate(req.context, report.instance.instanceId), + this.shutdownManager.getShutdownStatus(req.context, report.instance.group, report.instance.instanceId), + this.reconfigureManager.getReconfigureDate( + req.context, + report.instance.group, + report.instance.instanceId, + ), ]); await this.reconfigureManager.processInstanceReport(req.context, report, reconfigureDate); @@ -205,8 +209,12 @@ class Handlers { statsCounter.inc(); try { const [shutdownStatus, reconfigureDate] = await Promise.all([ - this.shutdownManager.getShutdownStatus(req.context, report.instance.instanceId), - this.reconfigureManager.getReconfigureDate(req.context, report.instance.instanceId), + this.shutdownManager.getShutdownStatus(req.context, report.instance.group, report.instance.instanceId), + this.reconfigureManager.getReconfigureDate( + req.context, + report.instance.group, + report.instance.instanceId, + ), ]); let postReconfigureDate = reconfigureDate; @@ -239,7 +247,7 @@ class Handlers { const request: InstanceGroupDesiredValuesRequest = req.body; const lock: AutoscalerLock = await this.lockManager.lockGroup(req.context, req.params.name); try { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, req.params.name); if (instanceGroup) { if (request.desiredCount != null) { instanceGroup.scalingOptions.desiredCount = request.desiredCount; @@ -268,7 +276,7 @@ class Handlers { const lock: AutoscalerLock = await this.lockManager.lockGroup(req.context, req.params.name); try { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, req.params.name); if (instanceGroup) { if (scalingActivitiesRequest.enableAutoScale != null) { instanceGroup.enableAutoScale = scalingActivitiesRequest.enableAutoScale; @@ -298,7 +306,7 @@ class Handlers { } async reconfigureInstanceGroup(req: Request, res: Response): Promise { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, req.params.name); if (instanceGroup) { if (instanceGroup.enableReconfiguration) { // add audit item recording the request @@ -330,7 +338,7 @@ class Handlers { const instanceConfigurationUpdateRequest: InstanceConfigurationUpdateRequest = req.body; const lock: AutoscalerLock = await this.lockManager.lockGroup(req.context, req.params.name); try { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, req.params.name); if (instanceGroup) { instanceGroup.instanceConfigurationId = instanceConfigurationUpdateRequest.instanceConfigurationId; await this.instanceGroupManager.upsertInstanceGroup(req.context, instanceGroup); @@ -390,7 +398,7 @@ class Handlers { } async getInstanceGroup(req: Request, res: Response): Promise { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, req.params.name); if (instanceGroup) { res.status(200); @@ -415,7 +423,7 @@ class Handlers { async getGroupReport(req: Request, res: Response): Promise { const groupName = req.params.name; const ctx = req.context; - const group: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(groupName); + const group: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, groupName); if (group) { const groupReport = await this.groupReportGenerator.generateReport(ctx, group, null); res.status(200); @@ -490,7 +498,7 @@ class Handlers { scaleDownProtectedTTL, }); - const group = await this.instanceGroupManager.getInstanceGroup(groupName); + const group = await this.instanceGroupManager.getInstanceGroup(req.context, groupName); if (group) { if (requestBody.instanceConfigurationId != null) { group.instanceConfigurationId = requestBody.instanceConfigurationId; @@ -519,7 +527,7 @@ class Handlers { await this.instanceGroupManager.upsertInstanceGroup(req.context, group); await this.instanceGroupManager.setAutoScaleGracePeriod(req.context, group); - await this.instanceGroupManager.setScaleDownProtected(group); + await this.instanceGroupManager.setScaleDownProtected(req.context, group); req.context.logger.info( `Newly launched instances in group ${groupName} will be protected for ${scaleDownProtectedTTL} seconds`, @@ -539,7 +547,7 @@ class Handlers { const scalingOptionsRequest: InstanceGroupScalingOptionsRequest = req.body; const lock: AutoscalerLock = await this.lockManager.lockGroup(req.context, req.params.name); try { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(req.context, req.params.name); if (instanceGroup) { if (scalingOptionsRequest.scaleUpQuantity != null) { instanceGroup.scalingOptions.scaleUpQuantity = scalingOptionsRequest.scaleUpQuantity; diff --git a/src/instance_group.ts b/src/instance_group.ts index ce73647..03604ee 100644 --- a/src/instance_group.ts +++ b/src/instance_group.ts @@ -25,12 +25,11 @@ export default class InstanceGroupManager { this.getAllInstanceGroupNames = this.getAllInstanceGroupNames.bind(this); this.getAllInstanceGroups = this.getAllInstanceGroups.bind(this); this.upsertInstanceGroup = this.upsertInstanceGroup.bind(this); - this.existsAtLeastOneGroup = this.existsAtLeastOneGroup.bind(this); } async init(ctx: Context): Promise { ctx.logger.info('Initializing instance group manager...'); - const existsAtLeastOneGroup = await this.existsAtLeastOneGroup(); + const existsAtLeastOneGroup = await this.existsAtLeastOneGroup(ctx); if (!existsAtLeastOneGroup) { ctx.logger.info('Storing instance groups into instance store'); await Promise.all(this.initialGroupList.map((group) => this.upsertInstanceGroup(ctx, group))); @@ -42,16 +41,16 @@ export default class InstanceGroupManager { return this.initialGroupList; } - async existsAtLeastOneGroup(): Promise { - return this.instanceStore.existsAtLeastOneGroup(); + async existsAtLeastOneGroup(ctx: Context): Promise { + return this.instanceStore.existsAtLeastOneGroup(ctx); } async upsertInstanceGroup(ctx: Context, group: InstanceGroup): Promise { return this.instanceStore.upsertInstanceGroup(ctx, group); } - async getInstanceGroup(groupName: string): Promise { - return this.instanceStore.getInstanceGroup(groupName); + async getInstanceGroup(ctx: Context, groupName: string): Promise { + return this.instanceStore.getInstanceGroup(ctx, groupName); } async getAllInstanceGroupsAsMap(ctx: Context): Promise> { @@ -131,41 +130,41 @@ export default class InstanceGroupManager { } async allowAutoscaling(ctx: Context, group: string): Promise { - return this.instanceStore.checkValue(`autoScaleGracePeriod:${group}`); + return this.instanceStore.checkValue(ctx, `autoScaleGracePeriod:${group}`); } async setAutoScaleGracePeriod(ctx: Context, group: InstanceGroup): Promise { ctx.logger.info(`resetting autoscale grace period for group ${group.name}: ${group.gracePeriodTTLSec}`, { gracePeriodTTLSec: group.gracePeriodTTLSec, }); - return this.setValue(`autoScaleGracePeriod:${group.name}`, group.gracePeriodTTLSec); + return this.setValue(ctx, `autoScaleGracePeriod:${group.name}`, group.gracePeriodTTLSec); } - async setScaleDownProtected(group: InstanceGroup): Promise { - return this.setValue(`isScaleDownProtected:${group.name}`, group.protectedTTLSec); + async setScaleDownProtected(ctx: Context, group: InstanceGroup): Promise { + return this.setValue(ctx, `isScaleDownProtected:${group.name}`, group.protectedTTLSec); } - async isScaleDownProtected(group: string): Promise { - return this.instanceStore.checkValue(`isScaleDownProtected:${group}`); + async isScaleDownProtected(ctx: Context, group: string): Promise { + return this.instanceStore.checkValue(ctx, `isScaleDownProtected:${group}`); } - async isGroupJobsCreationAllowed(): Promise { - return this.instanceStore.checkValue('groupJobsCreationGracePeriod'); + async isGroupJobsCreationAllowed(ctx: Context): Promise { + return this.instanceStore.checkValue(ctx, 'groupJobsCreationGracePeriod'); } - async setGroupJobsCreationGracePeriod(): Promise { - return this.setValue(`groupJobsCreationGracePeriod`, this.processingIntervalSeconds); + async setGroupJobsCreationGracePeriod(ctx: Context): Promise { + return this.setValue(ctx, `groupJobsCreationGracePeriod`, this.processingIntervalSeconds); } - async isSanityJobsCreationAllowed(): Promise { - return this.instanceStore.checkValue('sanityJobsCreationGracePeriod'); + async isSanityJobsCreationAllowed(ctx: Context): Promise { + return this.instanceStore.checkValue(ctx, 'sanityJobsCreationGracePeriod'); } - async setSanityJobsCreationGracePeriod(): Promise { - return this.setValue(`sanityJobsCreationGracePeriod`, this.sanityJobsIntervalSeconds); + async setSanityJobsCreationGracePeriod(ctx: Context): Promise { + return this.setValue(ctx, `sanityJobsCreationGracePeriod`, this.sanityJobsIntervalSeconds); } - async setValue(key: string, ttl: number): Promise { - return this.instanceStore.setValue(key, 'false', ttl); + async setValue(ctx: Context, key: string, ttl: number): Promise { + return this.instanceStore.setValue(ctx, key, 'false', ttl); } } diff --git a/src/instance_launcher.ts b/src/instance_launcher.ts index 5f0a7ba..0944336 100644 --- a/src/instance_launcher.ts +++ b/src/instance_launcher.ts @@ -61,7 +61,7 @@ export default class InstanceLauncher { } async launchOrShutdownInstancesByGroup(ctx: Context, groupName: string): Promise { - const group = await this.instanceGroupManager.getInstanceGroup(groupName); + const group = await this.instanceGroupManager.getInstanceGroup(ctx, groupName); if (!group) { throw new Error(`Group ${groupName} not found, failed to make launch decisions.`); } @@ -115,7 +115,7 @@ export default class InstanceLauncher { ctx.logger.debug(`[Launcher] Scaling throttle disabled for group ${groupName}.`); } - const scaleDownProtected = await this.instanceGroupManager.isScaleDownProtected(group.name); + const scaleDownProtected = await this.instanceGroupManager.isScaleDownProtected(ctx, group.name); const scaleUpCount = await this.cloudManager.scaleUp( ctx, group, @@ -328,7 +328,7 @@ export default class InstanceLauncher { const desiredScaleDownQuantity = currentInventory.length - Math.max(group.scalingOptions.minDesired, group.scalingOptions.desiredCount); - const unprotectedInstances = await this.filterOutProtectedInstances(ctx, currentInventory); + const unprotectedInstances = await this.filterOutProtectedInstances(ctx, group, currentInventory); let listOfInstancesForScaleDown: InstanceDetails[] = []; switch (group.type) { @@ -369,9 +369,14 @@ export default class InstanceLauncher { return listOfInstancesForScaleDown; } - async filterOutProtectedInstances(ctx: Context, instanceDetails: InstanceState[]): Promise { + async filterOutProtectedInstances( + ctx: Context, + group: InstanceGroup, + instanceDetails: InstanceState[], + ): Promise { const protectedInstances: boolean[] = await this.shutdownManager.areScaleDownProtected( ctx, + group.name, instanceDetails.map((instance) => { return instance.instanceId; }), diff --git a/src/instance_store.ts b/src/instance_store.ts index 0a04fca..4f75ef0 100644 --- a/src/instance_store.ts +++ b/src/instance_store.ts @@ -133,9 +133,8 @@ export interface InstanceState { lastReconfigured?: string; } -interface InstanceStore { +export interface InstanceStore { // instance related methods - fetchInstanceGroups: { (): Promise }; fetchInstanceStates: { (ctx: Context, group: string): Promise }; saveInstanceStatus: { (ctx: Context, group: string, state: InstanceState): Promise }; filterOutAndTrimExpiredStates: { (ctx: Context, group: string, states: InstanceState[]): Promise }; @@ -144,35 +143,37 @@ interface InstanceStore { setShutdownStatus: { (ctx: Context, instanceDetails: InstanceDetails[], status: string, ttl: number): Promise; }; - getShutdownStatuses: { (ctx: Context, instanceIds: string[]): Promise }; - getShutdownConfirmations: { (ctx: Context, instanceIds: string[]): Promise<(string | false)[]> }; - getShutdownStatus: { (ctx: Context, instanceId: string): Promise }; - getShutdownConfirmation: { (ctx: Context, instanceId: string): Promise }; + getShutdownStatuses: { (ctx: Context, group: string, instanceIds: string[]): Promise }; + getShutdownConfirmations: { (ctx: Context, group: string, instanceIds: string[]): Promise<(string | false)[]> }; + getShutdownStatus: { (ctx: Context, group: string, instanceId: string): Promise }; + getShutdownConfirmation: { (ctx: Context, group: string, instanceId: string): Promise }; setShutdownConfirmation: { (ctx: Context, instanceDetails: InstanceDetails[], status: string, ttl: number): Promise; }; - setScaleDownProtected: { (ctx: Context, instanceId: string, protectedTTL: number, mode: string): Promise }; - areScaleDownProtected: { (ctx: Context, instanceIds: string[]): Promise }; + setScaleDownProtected: { + (ctx: Context, group: string, instanceId: string, protectedTTL: number, mode: string): Promise; + }; + areScaleDownProtected: { (ctx: Context, group: string, instanceIds: string[]): Promise }; // reconfigure related methods setReconfigureDate: { (ctx: Context, instanceDetails: InstanceDetails[], date: string, ttl: number): Promise; }; unsetReconfigureDate: { (ctx: Context, instanceId: string, group: string): Promise }; - getReconfigureDates: { (ctx: Context, instanceIds: string[]): Promise }; - getReconfigureDate: { (ctx: Context, instanceId: string): Promise }; + getReconfigureDates: { (ctx: Context, group: string, instanceIds: string[]): Promise }; + getReconfigureDate: { (ctx: Context, group: string, instanceId: string): Promise }; // group related methods - existsAtLeastOneGroup: { (): Promise }; + existsAtLeastOneGroup: { (ctx: Context): Promise }; upsertInstanceGroup: { (ctx: Context, group: InstanceGroup): Promise }; - getInstanceGroup: { (groupName: string): Promise }; + getInstanceGroup: { (ctx: Context, groupName: string): Promise }; getAllInstanceGroups: { (ctx: Context): Promise }; getAllInstanceGroupNames: { (ctx: Context): Promise }; deleteInstanceGroup: { (ctx: Context, groupName: string): Promise }; // key related methods - checkValue: { (key: string): Promise }; - setValue: { (key: string, value: string, ttl: number): Promise }; + checkValue: { (ctx: Context, key: string): Promise }; + setValue: { (ctx: Context, key: string, value: string, ttl: number): Promise }; // sanity related saveCloudInstances: { (ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise }; diff --git a/src/instance_tracker.ts b/src/instance_tracker.ts index e041a3e..e9e3e82 100644 --- a/src/instance_tracker.ts +++ b/src/instance_tracker.ts @@ -414,7 +414,7 @@ export class InstanceTracker { if (filterShutdown) { const filterShutdownStart = process.hrtime(); - const statesExceptShutDown = await this.filterOutInstancesShuttingDown(ctx, states); + const statesExceptShutDown = await this.filterOutInstancesShuttingDown(ctx, group, states); const filterShutdownEnd = process.hrtime(filterShutdownStart); ctx.logger.debug(`instance filtered states, with no shutdown instances: ${statesExceptShutDown}`, { group, @@ -449,13 +449,17 @@ export class InstanceTracker { return shutdownStatus; } - async filterOutInstancesShuttingDown(ctx: Context, states: InstanceState[]): Promise { + async filterOutInstancesShuttingDown( + ctx: Context, + group: string, + states: InstanceState[], + ): Promise { const instanceIds = states.map((state) => { return state.instanceId; }); - const shutdownStatuses = await this.shutdownManager.getShutdownStatuses(ctx, instanceIds); + const shutdownStatuses = await this.shutdownManager.getShutdownStatuses(ctx, group, instanceIds); - const shutdownConfirmations = await this.shutdownManager.getShutdownConfirmations(ctx, instanceIds); + const shutdownConfirmations = await this.shutdownManager.getShutdownConfirmations(ctx, group, instanceIds); const statesShutdownStatus: boolean[] = []; for (let i = 0; i < states.length; i++) { diff --git a/src/job_manager.ts b/src/job_manager.ts index c053b6f..b695200 100644 --- a/src/job_manager.ts +++ b/src/job_manager.ts @@ -245,7 +245,7 @@ export default class JobManager { } async createSanityProcessingJobs(ctx: context.Context): Promise { - if (!(await this.instanceGroupManager.isSanityJobsCreationAllowed())) { + if (!(await this.instanceGroupManager.isSanityJobsCreationAllowed(ctx))) { ctx.logger.info('[JobManager] Wait before allowing sanity job creation'); return; } @@ -259,7 +259,7 @@ export default class JobManager { } try { - if (!(await this.instanceGroupManager.isSanityJobsCreationAllowed())) { + if (!(await this.instanceGroupManager.isSanityJobsCreationAllowed(ctx))) { ctx.logger.info('[JobManager] Wait before allowing sanity job creation'); return; } @@ -274,7 +274,7 @@ export default class JobManager { this.sanityLoopProcessingTimeoutMs, ); - await this.instanceGroupManager.setSanityJobsCreationGracePeriod(); + await this.instanceGroupManager.setSanityJobsCreationGracePeriod(ctx); } catch (err) { ctx.logger.error(`[JobManager] Error while creating sanity jobs for group ${err}`); jobCreateFailureCounter.inc({ type: JobType.Sanity }); @@ -284,7 +284,7 @@ export default class JobManager { } async createGroupProcessingJobs(ctx: context.Context): Promise { - if (!(await this.instanceGroupManager.isGroupJobsCreationAllowed())) { + if (!(await this.instanceGroupManager.isGroupJobsCreationAllowed(ctx))) { ctx.logger.info('[JobManager] Wait before allowing job creation'); return; } @@ -298,7 +298,7 @@ export default class JobManager { } try { - if (!(await this.instanceGroupManager.isGroupJobsCreationAllowed())) { + if (!(await this.instanceGroupManager.isGroupJobsCreationAllowed(ctx))) { ctx.logger.info('[JobManager] Wait before allowing job creation'); return; } @@ -322,7 +322,7 @@ export default class JobManager { // populate some queue health metrics const healthCheckResult = await this.jobQueue.checkHealth(); await this.metricsLoop.saveMetricQueueWaiting(healthCheckResult.waiting); - await this.instanceGroupManager.setGroupJobsCreationGracePeriod(); + await this.instanceGroupManager.setGroupJobsCreationGracePeriod(ctx); } catch (err) { ctx.logger.error(`[JobManager] Error while creating jobs for group ${err}`); jobCreateFailureCounter.inc(); diff --git a/src/reconfigure_manager.ts b/src/reconfigure_manager.ts index a5d49a2..b68b37b 100644 --- a/src/reconfigure_manager.ts +++ b/src/reconfigure_manager.ts @@ -33,12 +33,12 @@ export default class ReconfigureManager { return save; } - async getReconfigureDates(ctx: Context, instanceIds: string[]): Promise { - return this.instanceStore.getReconfigureDates(ctx, instanceIds); + async getReconfigureDates(ctx: Context, group: string, instanceIds: string[]): Promise { + return this.instanceStore.getReconfigureDates(ctx, group, instanceIds); } - async getReconfigureDate(ctx: Context, instanceId: string): Promise { - return this.instanceStore.getReconfigureDate(ctx, instanceId); + async getReconfigureDate(ctx: Context, group: string, instanceId: string): Promise { + return this.instanceStore.getReconfigureDate(ctx, group, instanceId); } async processInstanceReport(ctx: Context, report: StatsReport, reconfigureDate: string): Promise { diff --git a/src/redis.ts b/src/redis.ts index 4f7357c..80c41b1 100644 --- a/src/redis.ts +++ b/src/redis.ts @@ -47,19 +47,21 @@ export default class RedisStore implements MetricsStore, InstanceStore { group: string, states: InstanceState[], ): Promise { - return this.doFilterOutAndTrimExpiredStates(ctx, this.getGroupInstancesStatesKey(group), states); + return this.doFilterOutAndTrimExpiredStates(ctx, group, states); } private async doFilterOutAndTrimExpiredStates( ctx: Context, - groupInstancesStatesKey: string, + group: string, instanceStates: InstanceState[], ): Promise { + const groupInstancesStatesKey = this.getGroupInstancesStatesKey(group); const groupInstancesStatesResponse = []; const deletePipeline = this.redisClient.pipeline(); const shutdownStatuses: boolean[] = await this.getShutdownStatuses( ctx, + group, instanceStates.map((instanceState) => { return instanceState.instanceId; }), @@ -158,7 +160,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return instanceStatesResponse; } - async fetchInstanceGroups(): Promise { + async fetchInstanceGroups(_ctx: Context): Promise { const groups = await this.redisClient.keys('instances:status:*'); return groups.map((group) => group.split(':')[2]); } @@ -246,7 +248,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return true; } - async getShutdownStatuses(ctx: Context, instanceIds: string[]): Promise { + async getShutdownStatuses(ctx: Context, _group: string, instanceIds: string[]): Promise { const pipeline = this.redisClient.pipeline(); instanceIds.forEach((instanceId) => { const key = this.shutDownKey(instanceId); @@ -263,7 +265,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { } } - async getShutdownConfirmations(ctx: Context, instanceIds: string[]): Promise<(string | false)[]> { + async getShutdownConfirmations(ctx: Context, _group: string, instanceIds: string[]): Promise<(string | false)[]> { const pipeline = this.redisClient.pipeline(); instanceIds.forEach((instanceId) => { const key = this.shutDownConfirmedKey(instanceId); @@ -284,7 +286,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { } } - async getShutdownStatus(ctx: Context, instanceId: string): Promise { + async getShutdownStatus(ctx: Context, _group: string, instanceId: string): Promise { const key = this.shutDownKey(instanceId); const res = await this.redisClient.get(key); ctx.logger.debug('Read shutdown status', { key, res }); @@ -319,17 +321,18 @@ export default class RedisStore implements MetricsStore, InstanceStore { async setScaleDownProtected( ctx: Context, + group: string, instanceId: string, protectedTTL: number, mode = 'isScaleDownProtected', ): Promise { const key = this.protectedKey(instanceId); - ctx.logger.debug('Writing protected mode', { key, mode }); + ctx.logger.debug('Writing protected mode', { group, key, mode }); await this.redisClient.set(key, mode, 'EX', protectedTTL); return true; } - async areScaleDownProtected(ctx: Context, instanceIds: string[]): Promise { + async areScaleDownProtected(ctx: Context, group: string, instanceIds: string[]): Promise { const pipeline = this.redisClient.pipeline(); instanceIds.forEach((instanceId) => { const key = this.protectedKey(instanceId); @@ -341,7 +344,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return instance[1] == 'isScaleDownProtected'; }); } else { - ctx.logger.error('ScaleDownProtected Failed in pipeline.exec()'); + ctx.logger.error('ScaleDownProtected Failed in pipeline.exec()', { group }); return []; } } @@ -373,7 +376,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return true; } - async getReconfigureDates(ctx: Context, instanceIds: string[]): Promise { + async getReconfigureDates(ctx: Context, group: string, instanceIds: string[]): Promise { const pipeline = this.redisClient.pipeline(); instanceIds.forEach((instanceId) => { const key = this.reconfigureKey(instanceId); @@ -385,7 +388,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return instance[1]; }); } else { - ctx.logger.error('ReconfigureDates Failed in pipeline.exec()'); + ctx.logger.error('ReconfigureDates Failed in pipeline.exec()', { group }); return []; } } @@ -397,7 +400,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return res; } - async existsAtLeastOneGroup(): Promise { + async existsAtLeastOneGroup(_ctx: Context): Promise { let cursor = '0'; do { const result = await this.redisClient.hscan( @@ -435,7 +438,7 @@ export default class RedisStore implements MetricsStore, InstanceStore { return true; } - async getInstanceGroup(groupName: string): Promise { + async getInstanceGroup(_ctx: Context, groupName: string): Promise { const result = await this.redisClient.hget(this.GROUPS_HASH_NAME, groupName); if (result !== null && result.length > 0) { return JSON.parse(result); @@ -502,11 +505,11 @@ export default class RedisStore implements MetricsStore, InstanceStore { ctx.logger.info(`Group ${groupName} is deleted`); } - async checkValue(key: string): Promise { + async checkValue(_ctx: Context, key: string): Promise { const result = await this.redisClient.get(key); return !(result !== null && result.length > 0); } - async setValue(key: string, value: string, ttl: number): Promise { + async setValue(_ctx: Context, key: string, value: string, ttl: number): Promise { const result = await this.redisClient.set(key, value, 'EX', ttl); if (result !== 'OK') { throw new Error(`unable to set ${key}`); diff --git a/src/sanity_loop.ts b/src/sanity_loop.ts index ad09a9c..4e6fbf9 100644 --- a/src/sanity_loop.ts +++ b/src/sanity_loop.ts @@ -34,7 +34,7 @@ export default class SanityLoop { } async reportUntrackedInstances(ctx: Context, groupName: string): Promise { - const group: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(groupName); + const group: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(ctx, groupName); if (group) { const cloudStart = process.hrtime(); ctx.logger.info(`Retrieving ${group.cloud} instances for ${groupName}`); diff --git a/src/scaling_options_manager.ts b/src/scaling_options_manager.ts index 05da9ba..16d4bbb 100644 --- a/src/scaling_options_manager.ts +++ b/src/scaling_options_manager.ts @@ -93,7 +93,7 @@ export default class ScalingManager { } try { - const instanceGroup = await this.instanceGroupManager.getInstanceGroup(group.name); + const instanceGroup = await this.instanceGroupManager.getInstanceGroup(ctx, group.name); if (instanceGroup) { ScalingManager.setNewScalingOptions(ctx, request.options, instanceGroup, request.direction); await this.instanceGroupManager.upsertInstanceGroup(ctx, instanceGroup); diff --git a/src/shutdown_manager.ts b/src/shutdown_manager.ts index 71eea30..86c4788 100644 --- a/src/shutdown_manager.ts +++ b/src/shutdown_manager.ts @@ -25,20 +25,20 @@ export default class ShutdownManager { return save; } - async getShutdownStatuses(ctx: Context, instanceIds: string[]): Promise { - return this.instanceStore.getShutdownStatuses(ctx, instanceIds); + async getShutdownStatuses(ctx: Context, group: string, instanceIds: string[]): Promise { + return this.instanceStore.getShutdownStatuses(ctx, group, instanceIds); } - async getShutdownConfirmations(ctx: Context, instanceIds: string[]): Promise<(string | false)[]> { - return this.instanceStore.getShutdownConfirmations(ctx, instanceIds); + async getShutdownConfirmations(ctx: Context, group: string, instanceIds: string[]): Promise<(string | false)[]> { + return this.instanceStore.getShutdownConfirmations(ctx, group, instanceIds); } - async getShutdownStatus(ctx: Context, instanceId: string): Promise { - return this.instanceStore.getShutdownStatus(ctx, instanceId); + async getShutdownStatus(ctx: Context, group: string, instanceId: string): Promise { + return this.instanceStore.getShutdownStatus(ctx, group, instanceId); } - async getShutdownConfirmation(ctx: Context, instanceId: string): Promise { - return this.instanceStore.getShutdownConfirmation(ctx, instanceId); + async getShutdownConfirmation(ctx: Context, group: string, instanceId: string): Promise { + return this.instanceStore.getShutdownConfirmation(ctx, group, instanceId); } async setShutdownConfirmation( @@ -53,14 +53,15 @@ export default class ShutdownManager { async setScaleDownProtected( ctx: Context, + group: string, instanceId: string, protectedTTL: number, mode = 'isScaleDownProtected', ): Promise { - return this.instanceStore.setScaleDownProtected(ctx, instanceId, protectedTTL, mode); + return this.instanceStore.setScaleDownProtected(ctx, group, instanceId, protectedTTL, mode); } - async areScaleDownProtected(ctx: Context, instanceIds: string[]): Promise { - return this.instanceStore.areScaleDownProtected(ctx, instanceIds); + async areScaleDownProtected(ctx: Context, group: string, instanceIds: string[]): Promise { + return this.instanceStore.areScaleDownProtected(ctx, group, instanceIds); } } diff --git a/src/test/consul.ts b/src/test/consul.ts new file mode 100644 index 0000000..cf025eb --- /dev/null +++ b/src/test/consul.ts @@ -0,0 +1,106 @@ +/* eslint-disable @typescript-eslint/ban-ts-comment */ +// @ts-nocheck +import AutoscalerLogger from '../logger'; +import assert from 'node:assert'; +import test, { afterEach, describe, mock } from 'node:test'; + +import ConsulClient, { ConsulOptions } from '../consul'; + +const asLogger = new AutoscalerLogger({ logLevel: 'debug' }); +const logger = asLogger.createLogger('debug'); + +const ctx = { logger }; +ctx.logger.debug = mock.fn(); +ctx.logger.error = mock.fn(); + +const mockClient = { + kv: { + get: mock.fn(), + set: mock.fn(), + del: mock.fn(), + }, + status: { + leader: mock.fn(), + }, + session: { + create: mock.fn(), + destroy: mock.fn(), + }, + agent: { + service: { + register: mock.fn(), + deregister: mock.fn(), + }, + }, +}; + +const options = { + host: 'localhost', + port: 8500, + secure: false, + groupsPrefix: '_test/autoscaler/groups/', + client: mockClient, +}; + +const client = new ConsulClient(options); + +const group = { + name: 'test', + type: 'test', + region: 'test', + environment: 'test', + enableScheduler: true, + tags: { + test: 'test', + }, +}; + +describe('ConsulClient', () => { + afterEach(() => { + mock.restoreAll(); + }); + + describe('testListInstanceGroups', () => { + test('will list all instance groups', async () => { + const res = await client.getAllInstanceGroups(ctx); + assert.strictEqual(res.length, 0); + }); + + test('will upsert a test group', async () => { + const res = await client.upsertInstanceGroup(ctx, group); + assert.strictEqual(res, true); + }); + + test('will find upserted group when listing all instance groups', async () => { + mockClient.kv.get.mock.mockImplementationOnce(() => { + return { + 0: { + Key: options.groupsPrefix + group.name, + Value: JSON.stringify(group), + }, + }; + }); + + const res = await client.getAllInstanceGroupNames(ctx); + assert.strictEqual(res.length, 1); + assert.strictEqual(res[0], group.name); + mockClient.kv.get.mock.mockImplementationOnce( + () => + { + Key: options.groupsPrefix + group.name, + Value: JSON.stringify(group), + }, + ); + + const res2 = await client.getInstanceGroup(ctx, group.name); + assert.deepEqual(res2, group); + }); + + test('will delete upserted test group', async () => { + await client.deleteInstanceGroup(ctx, group.name); + + const res = await client.getInstanceGroup(ctx, group.name); + assert.strictEqual(res, undefined); + }); + }); +}); diff --git a/src/test/validator.ts b/src/test/validator.ts index 34b7753..056b9b3 100644 --- a/src/test/validator.ts +++ b/src/test/validator.ts @@ -29,7 +29,7 @@ describe('Validator', () => { }; const shutdownManager = { - getShutdownConfirmations: mock.fn((_, instanceIds) => instanceIds.map(() => false)), + getShutdownConfirmations: mock.fn((_ctx, _group, instanceIds) => instanceIds.map(() => false)), }; const groupName = 'group'; diff --git a/src/validator.ts b/src/validator.ts index 26b6780..125ed94 100644 --- a/src/validator.ts +++ b/src/validator.ts @@ -44,7 +44,7 @@ export default class Validator { const instanceIds = instanceStates.map((v, _) => v.instanceId); - const shutdownConfirmations = await this.shutdownManager.getShutdownConfirmations(context, instanceIds); + const shutdownConfirmations = await this.shutdownManager.getShutdownConfirmations(context, name, instanceIds); return ( instanceStates.filter((v, i) => { @@ -62,8 +62,12 @@ export default class Validator { return desiredCount >= minDesired && desiredCount <= maxDesired && minDesired <= maxDesired; } - async groupHasValidDesiredInput(name: string, request: InstanceGroupDesiredValuesRequest): Promise { - const instanceGroup: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(name); + async groupHasValidDesiredInput( + ctx: Context, + name: string, + request: InstanceGroupDesiredValuesRequest, + ): Promise { + const instanceGroup: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(ctx, name); const minDesired = request.minDesired != null ? request.minDesired : instanceGroup.scalingOptions.minDesired; const maxDesired = request.maxDesired != null ? request.maxDesired : instanceGroup.scalingOptions.maxDesired; @@ -74,7 +78,10 @@ export default class Validator { } async canLaunchInstances(req: Request, count: number): Promise { - const instanceGroup: InstanceGroup = await this.instanceGroupManager.getInstanceGroup(req.params.name); + const instanceGroup: InstanceGroup = await this.instanceGroupManager.getInstanceGroup( + req.context, + req.params.name, + ); // take new maximum into consideration, if set let max; if (req.body.maxDesired != null) {