Skip to content

Commit

Permalink
feat(refactor): move to support different backends for metric and gro…
Browse files Browse the repository at this point in the history
…up storage (#161)

* feat(refactor): move to support different backends for metric and group storage

- interfaces for MetricsStore and InstanceStore
- move redis-specific code to its own file
- initial prometheus support for metrics
- update tests

* further redis removal

* additional redis removal

* move group management

* further move of redis

* lock abstraction in preparation for consul locks

* change from Array<> syntax

* code review update
  • Loading branch information
aaronkvanmeerten authored Dec 2, 2024
1 parent f86427d commit 85afabf
Show file tree
Hide file tree
Showing 34 changed files with 1,655 additions and 809 deletions.
301 changes: 267 additions & 34 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
"express-unless": "^2.1.3",
"express-validator": "^7.0.1",
"got": "^11.8.6",
"ioredis": "^5.3.2",
"ioredis": "^5.4.1",
"jsonwebtoken": "^9.0.2",
"node-cache": "^5.1.2",
"oci-sdk": "^1.5.2",
"prom-client": "^15.0.0",
"prometheus-query": "^3.4.1",
"prometheus-remote-write": "^0.4.1",
"redis": "^3.1.2",
"redlock": "^4.2.0",
"redlock": "^5.0.0-beta.2",
"sha256": "^0.2.0",
"shortid": "^2.2.15",
"winston": "^3.11.0"
Expand All @@ -43,7 +45,7 @@
"@types/node-cache": "^4.2.5",
"@types/node-fetch": "^2.6.8",
"@types/redis": "^2.8.32",
"@types/redlock": "^4.0.6",
"@types/redlock": "^4.0.7",
"@types/sha256": "^0.2.1",
"@typescript-eslint/eslint-plugin": "^6.9.1",
"@typescript-eslint/parser": "^6.9.1",
Expand Down
89 changes: 61 additions & 28 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as context from './context';
import Handlers from './handlers';
import Validator from './validator';
import Redis, { RedisOptions } from 'ioredis';
import { RedisClient, ClientOpts } from 'redis';
import * as promClient from 'prom-client';
import AutoscalerLogger from './logger';
import shortid from 'shortid';
Expand All @@ -26,6 +25,10 @@ import { body, param, validationResult } from 'express-validator';
import SanityLoop from './sanity_loop';
import MetricsLoop from './metrics_loop';
import ScalingManager from './scaling_options_manager';
import RedisStore from './redis';
import PrometheusClient from './prometheus';
import MetricsStore from './metrics_store';
import InstanceStore from './instance_store';

//import { RequestTracker, RecorderRequestMeta } from './request_tracker';
//import * as meet from './meet_processor';
Expand All @@ -48,28 +51,68 @@ const redisOptions = <RedisOptions>{
host: config.RedisHost,
port: config.RedisPort,
};
const redisQueueOptions = <ClientOpts>{
host: config.RedisHost,
port: config.RedisPort,
};

if (config.RedisPassword) {
redisOptions.password = config.RedisPassword;
redisQueueOptions.password = config.RedisPassword;
}

if (config.RedisTLS) {
redisOptions.tls = {};
redisQueueOptions.tls = {};
}

if (config.RedisDb) {
redisOptions.db = config.RedisDb;
redisQueueOptions.db = config.RedisDb;
}

const redisClient = new Redis(redisOptions);
const bareRedisClient = new RedisClient(redisQueueOptions);

let metricsStore: MetricsStore;

switch (config.MetricsStoreProvider) {
case 'prometheus':
metricsStore = new PrometheusClient({
logger,
endpoint: config.PrometheusURL,
});
break;
default:
// redis
metricsStore = new RedisStore({
redisClient,
redisScanCount: config.RedisScanCount,
idleTTL: config.IdleTTL,
metricTTL: config.MetricTTL,
provisioningTTL: config.ProvisioningTTL,
shutdownStatusTTL: config.ShutdownStatusTTL,
groupRelatedDataTTL: config.GroupRelatedDataTTL,
serviceLevelMetricsTTL: config.ServiceLevelMetricsTTL,
});
break;
}

let instanceStore: InstanceStore;

switch (config.InstanceStoreProvider) {
// case 'consul':
// instanceStore = new ConsulClient({
// logger,
// endpoint: config.ConsulURL,
// });
// break;
default:
// redis
instanceStore = new RedisStore({
redisClient,
redisScanCount: config.RedisScanCount,
idleTTL: config.IdleTTL,
metricTTL: config.MetricTTL,
provisioningTTL: config.ProvisioningTTL,
shutdownStatusTTL: config.ShutdownStatusTTL,
groupRelatedDataTTL: config.GroupRelatedDataTTL,
serviceLevelMetricsTTL: config.ServiceLevelMetricsTTL,
});
break;
}

mapp.get('/health', (req: express.Request, res: express.Response) => {
logger.debug('Health check');
Expand All @@ -95,27 +138,22 @@ const audit = new Audit({
});

const shutdownManager = new ShutdownManager({
redisClient,
instanceStore,
shutdownTTL: config.ShutDownTTL,
audit,
});

const reconfigureManager = new ReconfigureManager({
redisClient,
instanceStore,
reconfigureTTL: config.ReconfigureTTL,
audit,
});

const instanceTracker = new InstanceTracker({
redisClient,
redisScanCount: config.RedisScanCount,
metricsStore,
instanceStore,
shutdownManager,
audit,
idleTTL: config.IdleTTL,
metricTTL: config.MetricTTL,
provisioningTTL: config.ProvisioningTTL,
shutdownStatusTTL: config.ShutdownStatusTTL,
groupRelatedDataTTL: config.GroupRelatedDataTTL,
});

const cloudManager = new CloudManager({
Expand All @@ -133,14 +171,13 @@ const cloudManager = new CloudManager({
});

const lockManager: LockManager = new LockManager(logger, {
redisClient: bareRedisClient,
redisClient,
jobCreationLockTTL: config.JobsCreationLockTTLMs,
groupLockTTLMs: config.GroupLockTTLMs,
});

const instanceGroupManager = new InstanceGroupManager({
redisClient,
redisScanCount: config.RedisScanCount,
instanceStore,
initialGroupList: config.GroupList,
groupJobsCreationGracePeriod: config.GroupJobsCreationGracePeriodSec,
sanityJobsCreationGracePeriod: config.SanityJobsCreationGracePeriodSec,
Expand All @@ -159,10 +196,8 @@ instanceGroupManager.init(initCtx).catch((err) => {

const autoscaleProcessor = new AutoscaleProcessor({
instanceTracker,
cloudManager,
instanceGroupManager,
lockManager,
redisClient,
audit,
});

Expand All @@ -179,8 +214,6 @@ const instanceLauncher = new InstanceLauncher({
instanceTracker,
cloudManager,
instanceGroupManager,
lockManager,
redisClient,
shutdownManager,
audit,
metricsLoop,
Expand All @@ -194,8 +227,8 @@ const groupReportGenerator = new GroupReportGenerator({
});

const sanityLoop = new SanityLoop({
redisClient,
metricsTTL: config.ServiceLevelMetricsTTL,
metricsStore,
instanceStore,
cloudManager,
reportExtCallRetryStrategy: {
maxTimeInSeconds: config.ReportExtCallMaxTimeInSeconds,
Expand All @@ -210,7 +243,7 @@ const sanityLoop = new SanityLoop({
// Bee-Queue also uses different a Redis library, so we map redisOptions to the object expected by Bee-Queue
const jobManager = new JobManager({
logger,
queueRedisOptions: redisQueueOptions,
queueRedisOptions: { host: config.RedisHost, port: config.RedisPort, password: config.RedisPassword },
lockManager,
instanceGroupManager,
instanceLauncher,
Expand Down
26 changes: 13 additions & 13 deletions src/audit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Redis } from 'ioredis';
import { InstanceDetails, InstanceState } from './instance_tracker';
import { Context } from './context';
import { InstanceDetails, InstanceState } from './instance_store';

export interface InstanceAudit {
instanceId: string;
Expand All @@ -23,7 +23,7 @@ export interface AutoScalerActionItem {
count: number;
oldDesiredCount: number;
newDesiredCount: number;
scaleMetrics: Array<number>;
scaleMetrics: number[];
}

export interface LauncherActionItem {
Expand All @@ -38,7 +38,7 @@ export interface GroupAuditResponse {
lastLauncherRun: string;
lastAutoScalerRun: string;
lastReconfigureRequest: string;
lastScaleMetrics?: Array<number>;
lastScaleMetrics?: number[];
autoScalerActionItems?: AutoScalerActionItem[];
launcherActionItems?: LauncherActionItem[];
}
Expand Down Expand Up @@ -114,7 +114,7 @@ export default class Audit {
return this.setInstanceValue(`audit:${groupName}:${instanceId}:request-to-launch`, value, this.auditTTL);
}

async saveShutdownEvents(instanceDetails: Array<InstanceDetails>): Promise<void> {
async saveShutdownEvents(instanceDetails: InstanceDetails[]): Promise<void> {
const pipeline = this.redisClient.pipeline();
for (const instance of instanceDetails) {
const value: InstanceAudit = {
Expand All @@ -132,7 +132,7 @@ export default class Audit {
await pipeline.exec();
}

async saveShutdownConfirmationEvents(instanceDetails: Array<InstanceDetails>): Promise<void> {
async saveShutdownConfirmationEvents(instanceDetails: InstanceDetails[]): Promise<void> {
const pipeline = this.redisClient.pipeline();
for (const instance of instanceDetails) {
const value: InstanceAudit = {
Expand Down Expand Up @@ -164,7 +164,7 @@ export default class Audit {
);
}

async saveReconfigureEvents(instanceDetails: Array<InstanceDetails>): Promise<void> {
async saveReconfigureEvents(instanceDetails: InstanceDetails[]): Promise<void> {
const pipeline = this.redisClient.pipeline();
for (const instance of instanceDetails) {
const value: InstanceAudit = {
Expand Down Expand Up @@ -242,7 +242,7 @@ export default class Audit {
return updateResponse;
}

async updateLastAutoScalerRun(ctx: Context, groupName: string, scaleMetrics: Array<number>): Promise<boolean> {
async updateLastAutoScalerRun(ctx: Context, groupName: string, scaleMetrics: number[]): Promise<boolean> {
const updateLastAutoScalerStart = process.hrtime();

// Extend TTL longer enough for the key to be deleted only after the group is deleted or no action is performed on it
Expand Down Expand Up @@ -315,7 +315,7 @@ export default class Audit {
}

async generateInstanceAudit(ctx: Context, groupName: string): Promise<InstanceAuditResponse[]> {
const instanceAudits: Array<InstanceAudit> = await this.getInstanceAudit(ctx, groupName);
const instanceAudits = await this.getInstanceAudit(ctx, groupName);
instanceAudits.sort((a, b) => (a.timestamp > b.timestamp ? 1 : -1));

const instanceAuditResponseList: InstanceAuditResponse[] = [];
Expand Down Expand Up @@ -364,7 +364,7 @@ export default class Audit {
}

async generateGroupAudit(ctx: Context, groupName: string): Promise<GroupAuditResponse> {
const groupAudits: Array<GroupAudit> = await this.getGroupAudit(ctx, groupName);
const groupAudits = await this.getGroupAudit(ctx, groupName);

const groupAuditResponse: GroupAuditResponse = {
lastLauncherRun: 'unknown',
Expand Down Expand Up @@ -413,8 +413,8 @@ export default class Audit {
return groupAuditResponse;
}

async getInstanceAudit(ctx: Context, groupName: string): Promise<Array<InstanceAudit>> {
const audit: Array<InstanceAudit> = [];
async getInstanceAudit(ctx: Context, groupName: string): Promise<InstanceAudit[]> {
const audit = <InstanceAudit[]>[];

let cursor = '0';
do {
Expand Down Expand Up @@ -447,8 +447,8 @@ export default class Audit {
return audit;
}

async getGroupAudit(ctx: Context, groupName: string): Promise<Array<GroupAudit>> {
const audit: Array<GroupAudit> = [];
async getGroupAudit(ctx: Context, groupName: string): Promise<GroupAudit[]> {
const audit = <GroupAudit[]>[];

const groupAuditStart = process.hrtime();
const items: string[] = await this.redisClient.zrange(this.getGroupAuditActionsKey(groupName), 0, -1);
Expand Down
Loading

0 comments on commit 85afabf

Please sign in to comment.