Skip to content

Commit

Permalink
feat(prometheus): updates for better testing (#162)
Browse files Browse the repository at this point in the history
* feat(prometheus): refactor driver and writer

uses context logger instead of local logger

* fix instance tracker tests to be provider agnostic

* move mock store to own container
  • Loading branch information
aaronkvanmeerten authored Dec 4, 2024
1 parent 339305d commit 7088058
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 153 deletions.
1 change: 0 additions & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ let metricsStore: MetricsStore;
switch (config.MetricsStoreProvider) {
case 'prometheus':
metricsStore = new PrometheusClient({
logger,
endpoint: config.PrometheusURL,
});
break;
Expand Down
2 changes: 1 addition & 1 deletion src/instance_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ interface InstanceStore {
setValue: { (key: string, value: string, ttl: number): Promise<boolean> };

// sanity related
saveCloudInstances: { (groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> };
saveCloudInstances: { (ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> };
}

export default InstanceStore;
2 changes: 1 addition & 1 deletion src/metrics_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ interface MetricsStore {
(ctx: Context, group: string, item: InstanceMetric): Promise<boolean>;
};
cleanInstanceMetrics: { (ctx: Context, group: string): Promise<boolean> };
saveMetricUnTrackedCount: { (groupName: string, count: number): Promise<boolean> };
saveMetricUnTrackedCount: { (ctx: Context, groupName: string, count: number): Promise<boolean> };
}

export default MetricsStore;
100 changes: 49 additions & 51 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as promClient from 'prom-client';
import { pushMetrics, Result, Options } from 'prometheus-remote-write';
import { PrometheusDriver, QueryResult } from 'prometheus-query';
import { Logger } from 'winston';
import MetricsStore, { InstanceMetric } from './metrics_store';
import { Context } from './context';

Expand All @@ -14,20 +13,17 @@ export interface PromMetrics {
}

export interface PrometheusOptions {
logger: Logger;
endpoint: string;
baseURL?: string;
promDriver?: PrometheusDriver;
promWriter?: PrometheusWriter;
}

interface PromQueryValue {
time: string;
value: number;
}

interface PrometheusWriter {
pushMetrics: (metrics: PromMetrics, options: Options) => Promise<Result>;
}

//metrics for prometheus query
const promQueryErrors = new promClient.Counter({
name: 'autoscaler_prom_query_errors',
Expand Down Expand Up @@ -60,88 +56,95 @@ const promWriteSum = new promClient.Counter({
help: 'Sum of timings for high level prometheus remote write',
});

export class PrometheusWriter {
private url: string;
constructor(url = 'localhost:9090/api/v1/write') {
this.url = url;
}

async pushMetrics(metrics: PromMetrics, labels: PromLabels): Promise<Result> {
const options = <Options>{
url: this.url,
labels,
// verbose: true,
headers: { 'Content-Type': 'application/x-protobuf' },
};
return pushMetrics(metrics, options);
}
}

export default class PrometheusClient implements MetricsStore {
private logger: Logger;
private endpoint: string;
private baseURL = '/api/v1';
private writeURL = '/api/v1/write';

private promDriver: PrometheusDriver;
private promWriter: PrometheusWriter;

constructor(options: PrometheusOptions) {
this.logger = options.logger;
this.endpoint = options.endpoint;
if (options.baseURL) {
this.baseURL = options.baseURL;
}
if (options.promDriver) {
this.promDriver = options.promDriver;
} else {
this.promDriver = new PrometheusDriver({
endpoint: this.endpoint,
baseURL: this.baseURL,
});
}
if (options.promWriter) {
this.promWriter = options.promWriter;
} else {
this.promWriter = new PrometheusWriter(this.endpoint + this.writeURL);
}
}

prometheusDriver(): PrometheusDriver {
return new PrometheusDriver({
endpoint: this.endpoint,
baseURL: this.baseURL,
});
}

prometheusWriter(): PrometheusWriter {
return {
pushMetrics(metrics: PromMetrics, options: Options): Promise<Result> {
return pushMetrics(metrics, options);
},
};
}

public async prometheusRangeQuery(query: string, driver = <PrometheusDriver>{}): Promise<QueryResult> {
if (!driver) driver = this.prometheusDriver();
public async prometheusRangeQuery(ctx: Context, query: string): Promise<QueryResult> {
const start = new Date().getTime() - 1 * 60 * 60 * 1000;
const end = new Date();
const step = 60; // 1 point every minute
try {
const qStart = process.hrtime();
const res = await driver.rangeQuery(query, start, end, step);
const res = await this.promDriver.rangeQuery(query, start, end, step);
const qEnd = process.hrtime(qStart);
promQueryCount.inc();
promQuerySum.inc(qEnd[0] * 1000 + qEnd[1] / 1000000);

return res;
} catch (err) {
promQueryErrors.inc();
this.logger.error('Error querying Prometheus:', { query, err });
ctx.logger.error('Error querying Prometheus:', { query, err });
}
}

async pushMetric(metrics: PromMetrics, labels: PromLabels, writer: PrometheusWriter): Promise<boolean> {
if (!writer) writer = this.prometheusWriter();
const pushUrl = this.endpoint + this.writeURL;
async pushMetric(ctx: Context, metrics: PromMetrics, labels: PromLabels): Promise<boolean> {
try {
const options = {
url: pushUrl,
labels,
// verbose: true,
headers: { 'Content-Type': 'application/x-protobuf' },
};
const pushStart = process.hrtime();
const res = await writer.pushMetrics(metrics, options);
const res = await this.promWriter.pushMetrics(metrics, labels);
const pushEnd = process.hrtime(pushStart);

if (res.status !== 204) {
promWriteErrors.inc();
this.logger.error('Returned status != 204 while pushing metrics to Prometheus:', res);
ctx.logger.error('Returned status != 204 while pushing metrics to Prometheus:', res);
} else {
promWriteCount.inc();
promWriteSum.inc(pushEnd[0] * 1000 + pushEnd[1] / 1000000);
return true;
}
} catch (err) {
promWriteErrors.inc();
this.logger.error('Error pushing metrics to Prometheus:', err);
ctx.logger.error('Error pushing metrics to Prometheus:', err);
}
return false;
}

async fetchInstanceMetrics(ctx: Context, group: string, driver = <PrometheusDriver>{}): Promise<InstanceMetric[]> {
async fetchInstanceMetrics(ctx: Context, group: string): Promise<InstanceMetric[]> {
const query = `autoscaler_instance_stress_level{group="${group}"}`;
const metricItems: InstanceMetric[] = [];
try {
const res = await this.prometheusRangeQuery(query, driver);
const res = await this.prometheusRangeQuery(ctx, query);
res.result.forEach((promItem) => {
promItem.values.forEach((v: PromQueryValue) => {
metricItems.push(<InstanceMetric>{
Expand All @@ -152,26 +155,21 @@ export default class PrometheusClient implements MetricsStore {
});
});
} catch (err) {
this.logger.error('Error fetching instance metrics:', { group, err });
ctx.logger.error('Error fetching instance metrics:', { group, err });
}
return metricItems;
}

async writeInstanceMetric(
ctx: Context,
group: string,
item: InstanceMetric,
writer = <PrometheusWriter>{},
): Promise<boolean> {
async writeInstanceMetric(ctx: Context, group: string, item: InstanceMetric): Promise<boolean> {
const labels = { instance: item.instanceId, group };
const metrics = { autoscaler_instance_stress_level: item.value };
return this.pushMetric(metrics, labels, writer);
return this.pushMetric(ctx, metrics, labels);
}

saveMetricUnTrackedCount(groupName: string, count: number, writer = <PrometheusWriter>{}): Promise<boolean> {
saveMetricUnTrackedCount(ctx: Context, groupName: string, count: number): Promise<boolean> {
const metrics = { autoscaler_untracked_instance_count: count };
const labels = { group: groupName };
return this.pushMetric(metrics, labels, writer);
return this.pushMetric(ctx, metrics, labels);
}

async cleanInstanceMetrics(_ctx: Context, _group: string): Promise<boolean> {
Expand Down
5 changes: 3 additions & 2 deletions src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,17 @@ export default class RedisStore implements MetricsStore, InstanceStore {
return true;
}

async saveMetricUnTrackedCount(groupName: string, count: number): Promise<boolean> {
async saveMetricUnTrackedCount(ctx: Context, groupName: string, count: number): Promise<boolean> {
const key = `service-metrics:${groupName}:untracked-count`;
const result = await this.redisClient.set(key, JSON.stringify(count), 'EX', this.serviceLevelMetricsTTL);
if (result !== 'OK') {
ctx.logger.error('Error saving untracked count', { key, count });
throw new Error(`unable to set ${key}`);
}
return true;
}

async saveCloudInstances(groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> {
async saveCloudInstances(_ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> {
await this.redisClient.set(
`cloud-instances-list:${groupName}`,
JSON.stringify(cloudInstances),
Expand Down
12 changes: 6 additions & 6 deletions src/sanity_loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ export default class SanityLoop {
} ms`,
);
ctx.logger.debug(`Retrieved ${group.cloud} instance details for ${groupName}`, { cloudInstances });
await this.saveCloudInstances(group.name, cloudInstances);
await this.saveCloudInstances(ctx, group.name, cloudInstances);

const groupReportStart = process.hrtime();
const groupReport = await this.groupReportGenerator.generateReport(ctx, group, cloudInstances);
const groupReportEnd = process.hrtime(groupReportStart);
ctx.logger.info(`Retrieved group report in ${groupReportEnd[0] * 1000 + groupReportEnd[1] / 1000000} ms`);

await this.saveMetricUnTrackedCount(groupName, groupReport.unTrackedCount);
await this.saveMetricUnTrackedCount(ctx, groupName, groupReport.unTrackedCount);
ctx.logger.info(
`Successfully saved cloud instances and untracked count ${groupReport.unTrackedCount} for ${groupName}`,
);
Expand All @@ -65,11 +65,11 @@ export default class SanityLoop {
}
}

async saveMetricUnTrackedCount(groupName: string, count: number): Promise<boolean> {
return this.metricsStore.saveMetricUnTrackedCount(groupName, count);
async saveMetricUnTrackedCount(ctx: Context, groupName: string, count: number): Promise<boolean> {
return this.metricsStore.saveMetricUnTrackedCount(ctx, groupName, count);
}

private async saveCloudInstances(groupName: string, cloudInstances: CloudInstance[]) {
return this.instanceStore.saveCloudInstances(groupName, cloudInstances);
private async saveCloudInstances(ctx: Context, groupName: string, cloudInstances: CloudInstance[]) {
return this.instanceStore.saveCloudInstances(ctx, groupName, cloudInstances);
}
}
41 changes: 11 additions & 30 deletions src/test/instance_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import assert from 'node:assert';
import test, { afterEach, describe, mock } from 'node:test';

import { InstanceTracker } from '../instance_tracker';
import RedisStore from '../redis';
import { mockStore } from './mock_store';

describe('InstanceTracker', () => {
let context = {
Expand All @@ -17,17 +17,6 @@ describe('InstanceTracker', () => {
},
};

const redisClient = {
expire: mock.fn(),
zremrangebyscore: mock.fn(() => 0),
hgetall: mock.fn(),
hset: mock.fn(),
hdel: mock.fn(),
del: mock.fn(),
scan: mock.fn(),
zrange: mock.fn(),
};

const shutdownManager = {
shutdown: mock.fn(),
};
Expand Down Expand Up @@ -62,19 +51,11 @@ describe('InstanceTracker', () => {
},
};

const redisStore = new RedisStore({ redisClient });

const instanceTracker = new InstanceTracker({
instanceStore: redisStore,
metricsStore: redisStore,
redisScanCount: 100,
instanceStore: mockStore,
metricsStore: mockStore,
shutdownManager,
audit,
idleTTL: 300,
metricTTL: 3600,
provisioningTTL: 900,
shutdownStatusTTL: 86400,
groupRelatedDataTTL: 172800,
});

afterEach(() => {
Expand Down Expand Up @@ -112,7 +93,7 @@ describe('InstanceTracker', () => {
hfGroupDetails.scalingOptions.scaleDownPeriodsCount,
hfGroupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);

const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
Expand All @@ -138,7 +119,7 @@ describe('InstanceTracker', () => {
hfGroupDetails.scalingOptions.scaleDownPeriodsCount,
hfGroupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);

const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
Expand All @@ -162,7 +143,7 @@ describe('InstanceTracker', () => {
hfGroupDetails.scalingOptions.scaleDownPeriodsCount,
hfGroupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);

const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
Expand All @@ -179,10 +160,10 @@ describe('InstanceTracker', () => {
});

assert.deepEqual(
context.logger.info.mock.calls[1].arguments[0],
context.logger.info.mock.calls[0].arguments[0],
`Filling in for missing metric from previous period`,
);
assert.deepEqual(context.logger.info.mock.calls[1].arguments[1], {
assert.deepEqual(context.logger.info.mock.calls[0].arguments[1], {
group: groupName,
instanceId: 'i-0a1b2c3d4e5f6g7h8',
periodIdx: 0,
Expand All @@ -206,7 +187,7 @@ describe('InstanceTracker', () => {
groupDetails.scalingOptions.scaleDownPeriodsCount,
groupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);
const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
groupDetails,
Expand Down Expand Up @@ -236,7 +217,7 @@ describe('InstanceTracker', () => {
groupDetails.scalingOptions.scaleDownPeriodsCount,
groupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);
const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
groupDetails,
Expand Down Expand Up @@ -266,7 +247,7 @@ describe('InstanceTracker', () => {
groupDetails.scalingOptions.scaleUpPeriodsCount,
);

redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);
const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
groupDetails,
Expand Down
Loading

0 comments on commit 7088058

Please sign in to comment.